---
 bin/varnishd/cache/cache.h          |    3 +-
 bin/varnishd/cache/cache_busyobj.c  |    5 ++-
 bin/varnishd/cache/cache_response.c |   67 +++++++++++++++++++++-------------
 3 files changed, 46 insertions(+), 29 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index fe65dbd..a64a0ae 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -541,7 +541,8 @@ struct busyobj {
 
        /* Stream stuff */
        ssize_t                 stream_max;
-       struct storage          *stream_frontchunk;
+       volatile ssize_t        stream_next;
+       volatile struct storage *stream_frontchunk;
        unsigned                stream_stopped;
        ssize_t                 stream_pass_bufsize;
 };
diff --git a/bin/varnishd/cache/cache_busyobj.c 
b/bin/varnishd/cache/cache_busyobj.c
index b39c170..8b97d4a 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -295,14 +295,15 @@ VBO_StreamSync(struct worker *wrk)
            (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
                /* Give notice to backend fetch that we are finished
                 * with all chunks before this one */
+               assert(busyobj->stream_next <= sctx->stream_next);
+               busyobj->stream_next = sctx->stream_next;
                busyobj->stream_frontchunk = sctx->stream_frontchunk;
        }
 
        sctx->stream_stopped = busyobj->stream_stopped;
        sctx->stream_max = busyobj->stream_max;
 
-       if (busyobj->use_locks && !sctx->stream_stopped &&
-           sctx->stream_next == sctx->stream_max) {
+       if (busyobj->use_locks && sctx->stream_next == sctx->stream_max) {
                while (!busyobj->stream_stopped &&
                       sctx->stream_max == busyobj->stream_max) {
                        Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
diff --git a/bin/varnishd/cache/cache_response.c 
b/bin/varnishd/cache/cache_response.c
index 6bd4da7..bc7f122 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -459,36 +459,51 @@ RES_StreamPoll(struct worker *wrk)
 {
        struct object *fetch_obj;
        struct storage *st;
+       struct busyobj *bo;
+       int pass = 0;
 
-       CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
-
-       VBO_StreamData(wrk->busyobj);
-       if (wrk->busyobj->do_stream_flipflop == 1) {
-               AN(wrk->sctx);
-               /* MBGXXX: Loop around waiting for the lag behind to
-                * be less than some configurable size, to keep the
-                * cache memory usage low (this for streaming
-                * extremely large objects with pass) */
-               VBO_StreamSync(wrk);
-               RES_StreamWrite(wrk->sp);
-       }
-
-       if (wrk->busyobj->stream_frontchunk == NULL)
-               return;
-
-       /* It's a pass - remove chunks already delivered. Should be OK
-        * to do lock-free, as we are not fiddling pointers of any
-        * storage chunk passed busyobj->stream_frontchunk */
+       bo = wrk->busyobj;
+       CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
        fetch_obj = wrk->busyobj->fetch_obj;
        CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
-       assert(fetch_obj->objcore == NULL ||
-              (fetch_obj->objcore->flags & OC_F_PASS));
+
+       if (fetch_obj->objcore == NULL || fetch_obj->objcore->flags & OC_F_PASS)
+               pass = 1;
+
+       VBO_StreamData(bo);
        while (1) {
-               st = VTAILQ_FIRST(&fetch_obj->store);
-               if (st == NULL || st == wrk->busyobj->stream_frontchunk)
+               if (bo->do_stream_flipflop == 1) {
+                       CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+                       VBO_StreamSync(wrk);
+                       RES_StreamWrite(wrk->sp);
+               }
+
+               if (bo->stream_frontchunk != NULL) {
+                       /* It's a pass - remove chunks already
+                        * delivered. Should be OK to do lock-free, as
+                        * we are not fiddling pointers of any storage
+                        * chunk beyond busyobj->stream_frontchunk */
+                       AN(pass);
+                       while (1) {
+                               st = VTAILQ_FIRST(&fetch_obj->store);
+                               if (st == NULL ||
+                                   st == bo->stream_frontchunk)
+                                       break;
+                               CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+                               VTAILQ_REMOVE(&fetch_obj->store, st, list);
+                               STV_free(st);
+                       }
+               }
+
+               if (pass == 0 || bo->stream_pass_bufsize == 0 ||
+                   bo->stream_max - bo->stream_next < bo->stream_pass_bufsize)
                        break;
-               CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
-               VTAILQ_REMOVE(&fetch_obj->store, st, list);
-               STV_free(st);
+
+               /* Loop around waiting for the lag behind to be less
+                * than some configurable size, to keep the cache
+                * memory usage low (this for streaming large objects
+                * with pass) */
+               if (bo->do_stream_flipflop == 0)
+                       VTIM_sleep(0.1);
        }
 }
-- 
1.7.4.1


_______________________________________________
varnish-dev mailing list
[email protected]
https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev

Reply via email to