Scott Lamb wrote:
(4) Detect while streaming that the client connection has been lost, so that we know to stop sending chunks. Patch attached to return error code in evhttp_send_reply_chunk(). (Wouldn't hurt to add error returns in other places, but this is my immediate need.)

Hmm, my patch was still appending to the buffer if the client doesn't pay attention; that's probably not ideal either.

(6) Flow control (for the --limit-rate case). I need to keep the outgoing connection input buffer, outgoing request input buffer, incoming request output buffer, and incoming connection output buffer from growing to be ridiculous. I'm considering something like this:

* define read watermarks on requests; it will only read from the outgoing connection when the input buffers are below the high (default of ~0 to preserve existing behavior). it will only call the chunk callback when the input buffers are above the low (default of 0).

* define an option to drain the input buffer after calling the chunk callback, defaulting to 1 for existing behavior. I'd want to turn it off so I can transfer bytes only if the output buffers are not full.

* define output watermarks as well. If below low watermark (defaulting to 0), call a fill function which in my case would also transfer from input buffers.

I'm not too attached to this plan. I keep thinking there might be a simpler way, but I don't see it.

Hmm, here's a simpler approach (evhttp-flowcontrol.patch). Opinions?

Flow control for evhttp_request

* evhttp_request_set_output_watermarks():
  - call "fill" when "output buffer size <= low watermark" becomes true
  - call "drain" when "output buffer size >= high watermark" becomes true

* evhttp_request_{pause,resume}():
  Pause and resume reading from the socket into the input buffer

My client<->proxy request's fill and drain callbacks resume and pause the proxy<->server request, respectively.

I'm wondering if there's any point in input watermarks here or for bufferevent either (remove wm_input? there doesn't seem to be an API yet). The idea of the downstream thing toggling it seems simpler.

Preliminary version of all patches attached. Still don't have anything for (5). These apply in this order:

+ evhttp-64bit-length.patch
+ evhttp-stream-in.patch
+ evhttp-nonchunked-streaming.patch
+ evhttp-detect-failure.patch
> evhttp-flowcontrol.patch

Best regards,
Scott

--
Scott Lamb <http://www.slamb.org/>
evhttp: stream incoming responses, even if not chunked

From: Scott Lamb <[EMAIL PROTECTED]>

Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---

 libevent/http.c |   19 ++++++++++++++-----
 1 files changed, 14 insertions(+), 5 deletions(-)


diff --git a/libevent/http.c b/libevent/http.c
index 500a991..d4d0ce6 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -730,11 +730,6 @@ evhttp_handle_chunked_read(struct evhttp_request *req, 
struct evbuffer *buf)
                    EVBUFFER_DATA(buf), req->ntoread);
                evbuffer_drain(buf, req->ntoread);
                req->ntoread = -1;
-               if (req->chunk_cb != NULL) {
-                       (*req->chunk_cb)(req, req->cb_arg);
-                       evbuffer_drain(req->input_buffer,
-                           EVBUFFER_LENGTH(req->input_buffer));
-               }
        }
 
        return (0);
@@ -768,7 +763,21 @@ evhttp_read_body(struct evhttp_connection *evcon, struct 
evhttp_request *req)
                req->ntoread = 0;
                evhttp_connection_done(evcon);
                return;
+       } else if (req->chunk_cb != NULL) {
+               /*
+                * The condition above is simply for efficiency; we do the
+                * move at the end if we aren't streaming.
+                */
+               req->ntoread -= EVBUFFER_LENGTH(buf);
+               evbuffer_add_buffer(req->input_buffer, buf);
        }
+
+       if (EVBUFFER_LENGTH(req->input_buffer) > 0 && req->chunk_cb != NULL) {
+               (*req->chunk_cb)(req, req->cb_arg);
+               evbuffer_drain(req->input_buffer,
+                   EVBUFFER_LENGTH(req->input_buffer));
+       }
+
        /* Read more! */
        event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
        EVHTTP_BASE_SET(evcon, &evcon->ev);
evhttp: use a 64-bit Content-Length: counter even on 32-bit platforms

From: Scott Lamb <[EMAIL PROTECTED]>

Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---

 libevent/evhttp.h |    2 +-
 libevent/http.c   |    4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)


diff --git a/libevent/evhttp.h b/libevent/evhttp.h
index 20a33b9..a47a2f0 100644
--- a/libevent/evhttp.h
+++ b/libevent/evhttp.h
@@ -204,7 +204,7 @@ struct {
        char *response_code_line;       /* Readable response */
 
        struct evbuffer *input_buffer;  /* read data */
-       int ntoread;
+       int64_t ntoread;
        int chunked;
 
        struct evbuffer *output_buffer; /* outgoing post or data */
diff --git a/libevent/http.c b/libevent/http.c
index 712a899..500a991 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -707,7 +707,7 @@ evhttp_handle_chunked_read(struct evhttp_request *req, 
struct evbuffer *buf)
                                event_free(p);
                                continue;
                        }
-                       req->ntoread = strtol(p, &endp, 16);
+                       req->ntoread = strtoll(p, &endp, 16);
                        error = *p == '\0' || (*endp != '\0' && *endp != ' ');
                        event_free(p);
                        if (error) {
@@ -1321,7 +1321,7 @@ evhttp_get_body_length(struct evhttp_request *req)
                req->ntoread = -1;
        } else {
                char *endp;
-               req->ntoread = strtol(content_length, &endp, 10);
+               req->ntoread = strtoll(content_length, &endp, 10);
                if (*content_length == '\0' || *endp != '\0') {
                        event_warnx("%s: illegal content length: %s",
                            __func__, content_length);
Flow control for evhttp_request

From: Scott Lamb <[EMAIL PROTECTED]>

* evhttp_request_set_output_watermarks():
  - call "fill" when "output buffer size <= low watermark" becomes true
  - call "drain" when "output buffer size >= high watermark" becomes true

* evhttp_request_{pause,resume}():
  Pause and resume reading from the socket into the input buffer

Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---

 libevent/evhttp.h |   25 +++++++++++++++++++++++++
 libevent/http.c   |   54 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 78 insertions(+), 1 deletions(-)


diff --git a/libevent/evhttp.h b/libevent/evhttp.h
index 349cf95..ab1ad75 100644
--- a/libevent/evhttp.h
+++ b/libevent/evhttp.h
@@ -139,6 +139,18 @@ void evhttp_send_error(struct evhttp_request *req, int 
error,
 void evhttp_send_reply(struct evhttp_request *req, int code,
     const char *reason, struct evbuffer *databuf);
 
+/**
+ * Manipulate watermarks for flow control.
+ * - Calls "fill_cb" with "filldrain_arg" when "output buffer size &lt;=
+ *   low watermark" becomes true.
+ * - Calls "drain_cb" with "filldrain_arg" when "output buffer size >=
+ *   high watermark" becomes true.
+ */
+void evhttp_request_set_output_watermarks(struct evhttp_request *,
+    size_t low_watermark, void (*fill_cb)(struct evhttp_request *, void *),
+    size_t high_watermark, void (*drain_cb)(struct evhttp_request *, void *),
+    void *filldrain_arg);
+
 /* Low-level response interface, for streaming/chunked replies */
 void evhttp_send_reply_start(struct evhttp_request *, int, const char *);
 int evhttp_send_reply_chunk(struct evhttp_request *, struct evbuffer *);
@@ -183,6 +195,7 @@ struct {
 #define EVHTTP_REQ_OWN_CONNECTION      0x0001
 #define EVHTTP_PROXY_REQUEST           0x0002
 #define EVHTTP_USER_OWNED              0x0004
+#define EVHTTP_REQ_PAUSED              0x0008
 
        struct evkeyvalq *input_headers;
        struct evkeyvalq *output_headers;
@@ -208,6 +221,7 @@ struct {
        int chunked;
 
        struct evbuffer *output_buffer; /* outgoing post or data */
+       struct event_watermark wm_output;
 
        /* Callback */
        void (*cb)(struct evhttp_request *, void *);
@@ -219,6 +233,14 @@ struct {
         * the regular callback.
         */
        void (*chunk_cb)(struct evhttp_request *, void *);
+
+       /*
+        * Fill and drain watermarks - call to maintain consistent
+        * buffer size.
+        */
+       void (*fill_cb)(struct evhttp_request *, void *);
+       void (*drain_cb)(struct evhttp_request *, void *);
+       void *filldrain_arg;
 };
 
 /**
@@ -229,6 +251,9 @@ struct {
 struct evhttp_request *evhttp_request_new(
        void (*cb)(struct evhttp_request *, void *), void *arg);
 
+void evhttp_request_pause(struct evhttp_request *);
+void evhttp_request_resume(struct evhttp_request *);
+
 /** enable delivery of chunks to requestor */
 void evhttp_request_set_chunked_cb(struct evhttp_request *,
     void (*cb)(struct evhttp_request *, void *));
diff --git a/libevent/http.c b/libevent/http.c
index 52913bd..dfe174a 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -606,6 +606,7 @@ void
 evhttp_write(evutil_socket_t fd, short what, void *arg)
 {
        struct evhttp_connection *evcon = arg;
+       struct evhttp_request* req = TAILQ_FIRST(&evcon->requests);
        int n;
 
        if (what == EV_TIMEOUT) {
@@ -626,6 +627,10 @@ evhttp_write(evutil_socket_t fd, short what, void *arg)
                return;
        }
 
+       if (EVBUFFER_LENGTH(req->evcon->output_buffer) <=
+           req->wm_output.low && req->fill_cb != NULL)
+               req->fill_cb(req, req->filldrain_arg);
+
        if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) {
                evhttp_add_event(&evcon->ev, 
                    evcon->timeout, HTTP_WRITE_TIMEOUT);
@@ -789,7 +794,8 @@ evhttp_read_body(struct evhttp_connection *evcon, struct 
evhttp_request *req)
        /* Read more! */
        event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
        EVHTTP_BASE_SET(evcon, &evcon->ev);
-       evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
+       if ((req->flags & EVHTTP_REQ_PAUSED) == 0)
+               evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
 }
 
 /*
@@ -1755,6 +1761,11 @@ evhttp_send_reply_chunk(struct evhttp_request *req, 
struct evbuffer *databuf)
        }
        if (evbuffer_add_buffer(req->evcon->output_buffer, databuf) < 0)
                return (-1);
+
+       if (EVBUFFER_LENGTH(req->evcon->output_buffer) >=
+           req->wm_output.high && req->drain_cb != NULL)
+               req->drain_cb(req, req->filldrain_arg);
+
        return (evhttp_write_buffer(req->evcon, NULL, NULL));
 }
 
@@ -2255,12 +2266,53 @@ evhttp_request_is_owned(struct evhttp_request *req)
 }
 
 void
+evhttp_request_pause(struct evhttp_request *req)
+{
+       req->flags |= EVHTTP_REQ_PAUSED;
+}
+
+void
+evhttp_request_resume(struct evhttp_request *req)
+{
+       struct evhttp_connection *evcon = req->evcon;
+
+       if (req->evcon != NULL && (req->flags & EVHTTP_REQ_PAUSED) != 0)
+               evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
+       req->flags &= ~EVHTTP_REQ_PAUSED;
+
+}
+
+void
 evhttp_request_set_chunked_cb(struct evhttp_request *req,
     void (*cb)(struct evhttp_request *, void *))
 {
        req->chunk_cb = cb;
 }
 
+void
+evhttp_request_set_output_watermarks(struct evhttp_request *req,
+    size_t low_watermark, void (*fill_cb)(struct evhttp_request *, void *),
+    size_t high_watermark, void (*drain_cb)(struct evhttp_request *, void *),
+    void *filldrain_arg)
+{
+       req->wm_output.low = low_watermark;
+       req->wm_output.high = high_watermark;
+       req->fill_cb = fill_cb;
+       req->drain_cb = drain_cb;
+       req->filldrain_arg = filldrain_arg;
+
+       if (req->evcon && req->evcon->output_buffer) {
+               /* See if we need to call them immediately. */
+               size_t len = EVBUFFER_LENGTH(req->evcon->output_buffer);
+
+               if (len <= low_watermark)
+                       fill_cb(req, filldrain_arg);
+               if (len >= high_watermark)
+                       drain_cb(req, filldrain_arg);
+       }
+}
+
+
 /*
  * Allows for inspection of the request URI
  */
evhttp: stream large responses without requiring chunking

From: Scott Lamb <[EMAIL PROTECTED]>

Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---

 libevent/http.c |    9 +++++++--
 1 files changed, 7 insertions(+), 2 deletions(-)


diff --git a/libevent/http.c b/libevent/http.c
index d4d0ce6..07426b9 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -1719,8 +1719,13 @@ evhttp_send_reply_start(struct evhttp_request *req, int 
code,
        /* set up to watch for client close */
        evhttp_connection_start_detectclose(req->evcon);
        evhttp_response_code(req, code, reason);
-       if (req->major == 1 && req->minor == 1) {
-               /* use chunked encoding for HTTP/1.1 */
+       if (evhttp_find_header(req->output_headers, "Content-Length") == NULL
+           && req->major == 1 && req->minor == 1) {
+               /*
+                * prefer HTTP/1.1 chunked encoding to closing the connection;
+                * note RFC 2616 section 4.4 forbids it with Content-Length:
+                * and it's not necessary then anyway.
+                */
                evhttp_add_header(req->output_headers, "Transfer-Encoding",
                    "chunked");
                req->chunked = 1;
evhttp: return success or failure from evhttp_send_reply_chunk()

From: Scott Lamb <[EMAIL PROTECTED]>

Callers can use this error return to abort streaming response generation.

Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---

 libevent/evhttp.h        |    5 ++++-
 libevent/http-internal.h |    2 +-
 libevent/http.c          |   36 ++++++++++++++++++++++++++----------
 3 files changed, 31 insertions(+), 12 deletions(-)


diff --git a/libevent/evhttp.h b/libevent/evhttp.h
index a47a2f0..349cf95 100644
--- a/libevent/evhttp.h
+++ b/libevent/evhttp.h
@@ -141,7 +141,7 @@ void evhttp_send_reply(struct evhttp_request *req, int code,
 
 /* Low-level response interface, for streaming/chunked replies */
 void evhttp_send_reply_start(struct evhttp_request *, int, const char *);
-void evhttp_send_reply_chunk(struct evhttp_request *, struct evbuffer *);
+int evhttp_send_reply_chunk(struct evhttp_request *, struct evbuffer *);
 void evhttp_send_reply_end(struct evhttp_request *);
 
 /**
@@ -233,6 +233,9 @@ struct evhttp_request *evhttp_request_new(
 void evhttp_request_set_chunked_cb(struct evhttp_request *,
     void (*cb)(struct evhttp_request *, void *));
 
+/** Aborts a request, killing its connection if active. */
+void evhttp_request_abort(struct evhttp_request *req);
+
 /** Frees the request object and removes associated events. */
 void evhttp_request_free(struct evhttp_request *req);
 
diff --git a/libevent/http-internal.h b/libevent/http-internal.h
index f170865..6aa405f 100644
--- a/libevent/http-internal.h
+++ b/libevent/http-internal.h
@@ -123,7 +123,7 @@ void evhttp_start_read(struct evhttp_connection *);
 void evhttp_read_header(evutil_socket_t, short, void *);
 void evhttp_make_header(struct evhttp_connection *, struct evhttp_request *);
 
-void evhttp_write_buffer(struct evhttp_connection *,
+int evhttp_write_buffer(struct evhttp_connection *,
     void (*)(struct evhttp_connection *, void *), void *);
 
 /* response sending HTML the data in the buffer */
diff --git a/libevent/http.c b/libevent/http.c
index 07426b9..52913bd 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -268,7 +268,7 @@ evhttp_method(enum evhttp_cmd_type type)
        return (method);
 }
 
-static void
+static int
 evhttp_add_event(struct event *ev, int timeout, int default_timeout)
 {
        if (timeout != 0) {
@@ -276,18 +276,21 @@ evhttp_add_event(struct event *ev, int timeout, int 
default_timeout)
                
                evutil_timerclear(&tv);
                tv.tv_sec = timeout != -1 ? timeout : default_timeout;
-               event_add(ev, &tv);
+               return (event_add(ev, &tv));
        } else {
-               event_add(ev, NULL);
+               return (event_add(ev, NULL));
        }
 }
 
-void
+int
 evhttp_write_buffer(struct evhttp_connection *evcon,
     void (*cb)(struct evhttp_connection *, void *), void *arg)
 {
        event_debug(("%s: preparing to write buffer\n", __func__));
 
+       if (evcon->fd == -1)
+               return (-1);
+
        /* Set call back */
        evcon->cb = cb;
        evcon->cb_arg = arg;
@@ -298,7 +301,12 @@ evhttp_write_buffer(struct evhttp_connection *evcon,
 
        event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon);
        EVHTTP_BASE_SET(evcon, &evcon->ev);
-       evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT);
+
+       if (evhttp_add_event(&evcon->ev, evcon->timeout,
+                            HTTP_WRITE_TIMEOUT) < 0)
+               return (-1);
+
+       return (0);
 }
 
 /*
@@ -1734,15 +1742,20 @@ evhttp_send_reply_start(struct evhttp_request *req, int 
code,
        evhttp_write_buffer(req->evcon, NULL, NULL);
 }
 
-void
+int
 evhttp_send_reply_chunk(struct evhttp_request *req, struct evbuffer *databuf)
 {
+       if (req->evcon->state == EVCON_DISCONNECTED)
+               return (-1);
+
        if (req->chunked) {
-               evbuffer_add_printf(req->evcon->output_buffer, "%x\r\n",
-                                   (unsigned)EVBUFFER_LENGTH(databuf));
+               if (evbuffer_add_printf(req->evcon->output_buffer, "%x\r\n",
+                                       (unsigned)EVBUFFER_LENGTH(databuf)) < 0)
+                       return (-1);
        }
-       evbuffer_add_buffer(req->evcon->output_buffer, databuf);
-       evhttp_write_buffer(req->evcon, NULL, NULL);
+       if (evbuffer_add_buffer(req->evcon->output_buffer, databuf) < 0)
+               return (-1);
+       return (evhttp_write_buffer(req->evcon, NULL, NULL));
 }
 
 void
@@ -2204,6 +2217,9 @@ evhttp_request_new(void (*cb)(struct evhttp_request *, 
void *), void *arg)
 void
 evhttp_request_free(struct evhttp_request *req)
 {
+       if (req == NULL)
+               return;
+
        if (req->remote_host != NULL)
                event_free(req->remote_host);
        if (req->uri != NULL)
_______________________________________________
Libevent-users mailing list
Libevent-users@monkey.org
http://monkeymail.org/mailman/listinfo/libevent-users

Reply via email to