VBO_StreamData is called by the fetch to update the busyobj with how
much data is available.

VBO_StreamSync is called by the dilvery to update it's local
stream_ctx struct with the new pointers.

VBO_StreamStopped signals object fetch has finished.

VBO_StreamWait waits for the streaming thread to finish looking at
other thread's workspace.
---
 bin/varnishd/cache/cache.h         |   16 ++++++
 bin/varnishd/cache/cache_busyobj.c |   91 ++++++++++++++++++++++++++++++++++++
 2 files changed, 107 insertions(+), 0 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 0bc3f59..0dd8ef4 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -271,6 +271,13 @@ struct stream_ctx {
 
        /* First byte of storage if we free it as we go (pass) */
        ssize_t                 stream_front;
+       struct storage          *stream_frontchunk;
+
+       /* Max byte we can stream */
+       ssize_t                 stream_max;
+
+       /* Backend fetch has finished */
+       unsigned                stream_stopped;
 };
 
 /*--------------------------------------------------------------------*/
@@ -530,6 +537,11 @@ struct busyobj {
        unsigned                do_gzip;
        unsigned                do_gunzip;
        unsigned                do_stream;
+
+       /* Stream stuff */
+       ssize_t                 stream_max;
+       struct storage          *stream_frontchunk;
+       unsigned                stream_stopped;
 };
 
 /* Object structure --------------------------------------------------*/
@@ -738,6 +750,10 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk);
 struct busyobj *VBO_RefBusyObj(struct busyobj *busyobj);
 unsigned VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj);
 void VBO_Free(struct vbo **vbo);
+void VBO_StreamStopped(struct busyobj *busyobj);
+void VBO_StreamWait(struct busyobj *busyobj);
+void VBO_StreamData(struct busyobj *busyobj);
+void VBO_StreamSync(struct worker *wrk);
 
 /* cache_center.c [CNT] */
 void CNT_Session(struct sess *sp);
diff --git a/bin/varnishd/cache/cache_busyobj.c 
b/bin/varnishd/cache/cache_busyobj.c
index 423df93..7651999 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -208,3 +208,94 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
 
        return (r);
 }
+
+/* Signal that the fetch thread has stopped */
+void
+VBO_StreamStopped(struct busyobj *busyobj)
+{
+       if (!busyobj->use_locks) {
+               busyobj->stream_stopped = 1;
+               return;
+       }
+       Lck_Lock(&busyobj->vbo->mtx);
+       busyobj->stream_stopped = 1;
+       AZ(pthread_cond_broadcast(&busyobj->vbo->cond));
+       Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Wait for the fetch thread to finish reading the pipeline buffer */
+void
+VBO_StreamWait(struct busyobj *busyobj)
+{
+       if (!busyobj->use_locks)
+               return;
+       Lck_Lock(&busyobj->vbo->mtx);
+       while (busyobj->htc.pipeline.b != NULL && busyobj->stream_stopped == 0)
+               Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, NULL);
+       Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Signal additional data available */
+void
+VBO_StreamData(struct busyobj *busyobj)
+{
+       CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+       CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC);
+
+       if (busyobj->use_locks)
+               Lck_Lock(&busyobj->vbo->mtx);
+       assert(busyobj->fetch_obj->len >= busyobj->stream_max);
+       if (busyobj->fetch_obj->len > busyobj->stream_max) {
+               busyobj->stream_max = busyobj->fetch_obj->len;
+               if (busyobj->use_locks)
+                       AZ(pthread_cond_broadcast(&busyobj->vbo->cond));
+       }
+       if (busyobj->use_locks)
+               Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Sync the client's stream_ctx with the busyobj, and block on no more
+ * data available */
+void
+VBO_StreamSync(struct worker *wrk)
+{
+       struct busyobj *busyobj;
+       struct stream_ctx *sctx;
+
+       CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+       CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+       busyobj = wrk->busyobj;
+       CHECK_OBJ_NOTNULL(wrk->sp, SESS_MAGIC);
+       CHECK_OBJ_NOTNULL(wrk->sp->req, REQ_MAGIC);
+       CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC);
+       CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+       sctx = wrk->sctx;
+
+       if (busyobj->use_locks)
+               Lck_Lock(&busyobj->vbo->mtx);
+       assert(sctx->stream_max <= busyobj->stream_max);
+
+       if (wrk->sp->req->obj->objcore == NULL ||
+           (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
+               /* Give notice to backend fetch that we are finished
+                * with all chunks before this one */
+               busyobj->stream_frontchunk = sctx->stream_frontchunk;
+       }
+
+       sctx->stream_stopped = busyobj->stream_stopped;
+       sctx->stream_max = busyobj->stream_max;
+
+       if (busyobj->use_locks && !sctx->stream_stopped &&
+           sctx->stream_next == sctx->stream_max) {
+               while (!busyobj->stream_stopped &&
+                      sctx->stream_max == busyobj->stream_max) {
+                       Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
+                                    NULL);
+               }
+               sctx->stream_stopped = busyobj->stream_stopped;
+               sctx->stream_max = busyobj->stream_max;
+       }
+
+       if (busyobj->use_locks)
+               Lck_Unlock(&busyobj->vbo->mtx);
+}
-- 
1.7.4.1


_______________________________________________
varnish-dev mailing list
[email protected]
https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev

Reply via email to