---
 bin/varnishd/cache/cache.h          |    5 ++
 bin/varnishd/cache/cache_center.c   |   96 +++++++++++++++++++++++++++++-----
 bin/varnishd/cache/cache_response.c |   84 ++++++++++++++++++++++--------
 3 files changed, 149 insertions(+), 36 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index da06965..880f5e3 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -537,6 +537,7 @@ struct busyobj {
        unsigned                do_gzip;
        unsigned                do_gunzip;
        unsigned                do_stream;
+       unsigned                do_stream_flipflop;
 
        /* Stream stuff */
        ssize_t                 stream_max;
@@ -793,6 +794,8 @@ int FetchError(struct worker *w, const char *error);
 int FetchError2(struct worker *w, const char *error, const char *more);
 int FetchHdr(struct sess *sp, int need_host_hdr);
 int FetchBody(struct worker *w, struct busyobj *bo);
+void FetchBodyBackground(struct sess *sp, struct busyobj *bo);
+void FetchBodyWait(struct busyobj *bo);
 int FetchReqBody(const struct sess *sp);
 void Fetch_Init(void);
 
@@ -999,6 +1002,8 @@ void WSL_Flush(struct worker *w, int overflow);
 void RES_BuildHttp(const struct sess *sp);
 void RES_WriteObj(struct sess *sp);
 void RES_StreamStart(struct sess *sp);
+void RES_StreamBody(struct sess *sp);
+void RES_StreamWrite(struct sess *sp);
 void RES_StreamEnd(struct sess *sp);
 void RES_StreamPoll(struct worker *wrk);
 
diff --git a/bin/varnishd/cache/cache_center.c 
b/bin/varnishd/cache/cache_center.c
index 386012c..1f1a239 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -963,13 +963,47 @@ DOT }
 DOT streambody -> DONE [style=bold,color=cyan]
  */
 
+/* Background fetch task. Should be called with ref on busyobj, and
+   the objcore if present */
+
+static void
+cnt_streambody_task(struct worker *wrk, void *priv)
+{
+       struct object *obj;
+       struct objcore *objcore;
+       unsigned u;
+
+       AZ(wrk->busyobj);
+       CAST_OBJ_NOTNULL(wrk->busyobj, priv, BUSYOBJ_MAGIC);
+       AN(wrk->busyobj->use_locks);
+
+       CHECK_OBJ_NOTNULL(wrk->busyobj->fetch_obj, OBJECT_MAGIC);
+       AN(wrk->busyobj->vbc);
+       obj = wrk->busyobj->fetch_obj;
+       objcore = obj->objcore;
+
+       wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj);
+       AZ(wrk->busyobj->fetch_obj);
+       AZ(wrk->busyobj->vbc);
+       wrk->busyobj->vfp = NULL;
+       VBO_StreamStopped(wrk->busyobj);
+
+       u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
+       if (objcore != NULL || u == 0) {
+               /* Only deref object if it has it's own refcnt, or we
+                * were the last to deref the busyobj */
+               (void)HSH_Deref(wrk, NULL, &obj);
+       }
+}
+
 static int
 cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
 {
-       int i;
        struct stream_ctx sctx;
        uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ?
            cache_param->gzip_stack_buffer : 1];
+       struct worker *wrk_ex;
+       unsigned u;
 
        CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
@@ -987,28 +1021,56 @@ cnt_streambody(struct sess *sp, struct worker *wrk, 
struct req *req)
                sctx.obuf_len = sizeof (obuf);
        }
 
-       RES_StreamStart(sp);
-
        AssertObjCorePassOrBusy(req->obj->objcore);
+       RES_StreamStart(sp);
 
+       /* MBGXXX: Test on OC_F_BUSY to see if we should initiate
+        * fetch at all. This code now assumes all passes through here
+        * needs to do the fetch as well. (Multiple streaming clients
+        * not implemented yet) */
        AZ(wrk->busyobj->fetch_obj);
        wrk->busyobj->fetch_obj = req->obj;
-       i = FetchBody(wrk, wrk->busyobj);
-       AZ(wrk->busyobj->fetch_obj);
-
        http_Setup(wrk->busyobj->bereq, NULL);
        http_Setup(wrk->busyobj->beresp, NULL);
-       wrk->busyobj->vfp = NULL;
-       AZ(wrk->busyobj->vbc);
-       AN(req->director);
+       wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
+                                          * thread grabbing
+                                          * timeout */
+       if (wrk_ex != NULL) {
+               /* Set up separate thread fetch */
+               wrk->busyobj->use_locks = 1;
+               if (req->obj->objcore != NULL)
+                       /* Grab a ref on the objcore for the other thread */
+                       HSH_Ref(req->obj->objcore);
+               VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */
+               WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj);
+       } else {
+               /* We have no worker */
+               if (wrk->busyobj->fetch_obj->objcore == NULL ||
+                   wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) {
+                       /* It's a pass, prefer flipflop
+                        * streaming. (MBGXXX: Flipflop not finished
+                        * yet) */
+                       wrk->busyobj->do_stream_flipflop = 1;
+               }
+               wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj);
+               VBO_StreamStopped(wrk->busyobj);
+       }
 
-       if (!i && req->obj->objcore != NULL) {
+       RES_StreamBody(sp);
+
+       if (wrk->busyobj->htc.ws == wrk->ws)
+               /* Busyobj's htc has buffer on our workspace,
+                  wait for it to be released */
+               VBO_StreamWait(wrk->busyobj);
+
+       if (wrk->busyobj->fetch_failed) {
+               req->doclose = "Stream error";
+       } else if (req->obj->objcore != NULL) {
+               /* MBGXXX: This should be done on the bg task */
                EXP_Insert(req->obj);
                AN(req->obj->objcore);
                AN(req->obj->objcore->ban);
                HSH_Unbusy(wrk);
-       } else {
-               req->doclose = "Stream error";
        }
        wrk->acct_tmp.fetch++;
        req->director = NULL;
@@ -1021,8 +1083,14 @@ cnt_streambody(struct sess *sp, struct worker *wrk, 
struct req *req)
        wrk->sctx = NULL;
        assert(WRW_IsReleased(wrk));
        assert(wrk->wrw.ciov == wrk->wrw.siov);
-       (void)HSH_Deref(wrk, NULL, &req->obj);
-       (void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
+       u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
+       if (req->obj->objcore != NULL || u == 0) {
+               /* Only deref object if it has it's own refcnt, or we
+                * were the last to deref the busyobj */
+               (void)HSH_Deref(wrk, NULL, &req->obj);
+       } else
+               /* Object will be deref'ed by fetch thread */
+               req->obj = NULL;
        http_Setup(req->resp, NULL);
        sp->step = STP_DONE;
        return (0);
diff --git a/bin/varnishd/cache/cache_response.c 
b/bin/varnishd/cache/cache_response.c
index cb6ddd6..a49acbe 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -366,22 +366,39 @@ RES_StreamStart(struct sess *sp)
 }
 
 void
-RES_StreamPoll(struct worker *wrk)
+RES_StreamBody(struct sess *sp)
 {
        struct stream_ctx *sctx;
+       struct busyobj *bo;
+
+       sctx = sp->wrk->sctx;
+       CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
+       bo = sp->wrk->busyobj;
+       CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
+       AN(sp->req->wantbody);
+
+       while (!sctx->stream_stopped || sctx->stream_next < sctx->stream_max)  {
+               VBO_StreamSync(sp->wrk);
+               RES_StreamWrite(sp);
+       }
+}
+
+void
+RES_StreamWrite(struct sess *sp)
+{
+       struct worker *wrk;
+       struct stream_ctx *sctx;
        struct storage *st;
-       struct object *fetch_obj;
        ssize_t l, l2, stlen;
        void *ptr;
 
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       wrk = sp->wrk;
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
        CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC);
        sctx = wrk->sctx;
        CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
 
-       VBO_StreamData(wrk->busyobj);
-       VBO_StreamSync(wrk);
-
        if (sctx->stream_max == sctx->stream_next)
                return;
        assert(sctx->stream_max > sctx->stream_next);
@@ -418,23 +435,6 @@ RES_StreamPoll(struct worker *wrk)
        }
        if (!(wrk->res_mode & RES_GUNZIP))
                (void)WRW_Flush(wrk);
-
-       if (wrk->busyobj->stream_frontchunk == NULL)
-               return;
-
-       /* It's a pass - remove chunks already delivered */
-       fetch_obj = wrk->busyobj->fetch_obj;
-       CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
-       assert(fetch_obj->objcore == NULL ||
-              (fetch_obj->objcore->flags & OC_F_PASS));
-       while (1) {
-               st = VTAILQ_FIRST(&fetch_obj->store);
-               if (st == NULL || st == wrk->busyobj->stream_frontchunk)
-                       break;
-               CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
-               VTAILQ_REMOVE(&fetch_obj->store, st, list);
-               STV_free(st);
-       }
 }
 
 void
@@ -453,3 +453,43 @@ RES_StreamEnd(struct sess *sp)
        if (WRW_FlushRelease(sp->wrk))
                SES_Close(sp, "remote closed");
 }
+
+void
+RES_StreamPoll(struct worker *wrk)
+{
+       struct object *fetch_obj;
+       struct storage *st;
+
+       CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+
+       VBO_StreamData(wrk->busyobj);
+       if (wrk->busyobj->do_stream_flipflop == 1) {
+               AN(wrk->sctx);
+               /* MBGXXX: Do flip-flop streaming */
+               /* MBGXXX: Loop around waiting for the lag behind to
+                * be less than some configurable size, to keep the
+                * cache memory usage low (this for streaming
+                * extremely large objects with pass) */
+               VBO_StreamSync(wrk);
+               RES_StreamWrite(wrk->sp);
+       }
+
+       if (wrk->busyobj->stream_frontchunk == NULL)
+               return;
+
+       /* It's a pass - remove chunks already delivered. Should be OK
+        * to do lock-free, as we are not fiddling pointers of any
+        * storage chunk passed busyobj->stream_frontchunk */
+       fetch_obj = wrk->busyobj->fetch_obj;
+       CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
+       assert(fetch_obj->objcore == NULL ||
+              (fetch_obj->objcore->flags & OC_F_PASS));
+       while (1) {
+               st = VTAILQ_FIRST(&fetch_obj->store);
+               if (st == NULL || st == wrk->busyobj->stream_frontchunk)
+                       break;
+               CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+               VTAILQ_REMOVE(&fetch_obj->store, st, list);
+               STV_free(st);
+       }
+}
-- 
1.7.4.1


_______________________________________________
varnish-dev mailing list
[email protected]
https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev

Reply via email to