---
 bin/varnishd/cache/cache_center.c |  117 ++++++++++++++++++++++---------------
 bin/varnishd/cache/cache_expire.c |    1 -
 bin/varnishd/cache/cache_fetch.c  |    2 -
 bin/varnishd/cache/cache_hash.c   |   33 ++++++++++-
 bin/varnishd/hash/hash_slinger.h  |    1 +
 5 files changed, 103 insertions(+), 51 deletions(-)

diff --git a/bin/varnishd/cache/cache_center.c 
b/bin/varnishd/cache/cache_center.c
index 7a9134e..23b5f71 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -224,7 +224,6 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct 
req *req)
        if (wrk->busyobj != NULL) {
                CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
                AN(wrk->busyobj->do_stream);
-               AssertObjCorePassOrBusy(req->obj->objcore);
        }
 
        wrk->res_mode = 0;
@@ -301,14 +300,17 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct 
req *req)
        case VCL_RET_RESTART:
                if (req->restarts >= cache_param->max_restarts)
                        break;
-               if (wrk->busyobj != NULL) {
+               if (wrk->busyobj != NULL &&
+                   (req->obj->objcore == NULL ||
+                    req->obj->objcore->flags & OC_F_BUSY)) {
                        AN(wrk->busyobj->do_stream);
                        VDI_CloseFd(wrk, &wrk->busyobj->vbc);
                        HSH_Drop(wrk);
-                       VBO_DerefBusyObj(wrk, &wrk->busyobj);
                } else {
                        (void)HSH_Deref(wrk, NULL, &req->obj);
                }
+               if (wrk->busyobj != NULL)
+                       (void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
                AZ(req->obj);
                req->restarts++;
                req->director = NULL;
@@ -318,8 +320,8 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct 
req *req)
        default:
                WRONG("Illegal action in vcl_deliver{}");
        }
-       if (wrk->busyobj != NULL && wrk->busyobj->do_stream) {
-               AssertObjCorePassOrBusy(req->obj->objcore);
+       if (wrk->busyobj != NULL) {
+               AN(wrk->busyobj->do_stream);
                sp->step = STP_STREAMBODY;
        } else {
                sp->step = STP_DELIVER;
@@ -983,13 +985,18 @@ cnt_streambody_task(struct worker *wrk, void *priv)
        objcore = obj->objcore;
 
        wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj);
+       VBO_StreamStopped(wrk->busyobj);
 
        wrk->stats.fetch_threaded++;
 
+       if (obj->objcore != NULL) {
+               HSH_RemoveBusyObj(wrk, obj->objcore);
+               if (wrk->busyobj->fetch_failed == 0)
+                       EXP_Insert(obj);
+       }
        AZ(wrk->busyobj->fetch_obj);
        AZ(wrk->busyobj->vbc);
        wrk->busyobj->vfp = NULL;
-       VBO_StreamStopped(wrk->busyobj);
 
        u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
        if (objcore != NULL || u == 0) {
@@ -1013,6 +1020,7 @@ cnt_streambody(struct sess *sp, struct worker *wrk, 
struct req *req)
        CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
 
        CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+       AN(wrk->busyobj->do_stream);
        memset(&sctx, 0, sizeof sctx);
        sctx.magic = STREAM_CTX_MAGIC;
        AZ(wrk->sctx);
@@ -1024,58 +1032,72 @@ cnt_streambody(struct sess *sp, struct worker *wrk, 
struct req *req)
                sctx.obuf_len = sizeof (obuf);
        }
 
-       AssertObjCorePassOrBusy(req->obj->objcore);
        RES_StreamStart(sp);
 
-       /* MBGXXX: Test on OC_F_BUSY to see if we should initiate
-        * fetch at all. This code now assumes all passes through here
-        * needs to do the fetch as well. (Multiple streaming clients
-        * not implemented yet) */
-       wrk->acct_tmp.fetch++;
-       AZ(wrk->busyobj->fetch_obj);
-       wrk->busyobj->fetch_obj = req->obj;
-       http_Setup(wrk->busyobj->bereq, NULL);
-       http_Setup(wrk->busyobj->beresp, NULL);
-       wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
-                                          * thread grabbing
-                                          * timeout */
-       if (wrk_ex != NULL) {
-               /* Set up separate thread fetch */
-               wrk->busyobj->use_locks = 1;
-               if (req->obj->objcore != NULL)
-                       /* Grab a ref on the objcore for the other thread */
-                       HSH_Ref(req->obj->objcore);
-               VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */
-               WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj);
-       } else {
-               /* We have no worker */
-               if (wrk->busyobj->fetch_obj->objcore == NULL ||
-                   wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) {
-                       /* It's a pass, prefer flipflop
-                        * streaming. (MBGXXX: Flipflop not finished
-                        * yet) */
-                       wrk->busyobj->do_stream_flipflop = 1;
-                       wrk->stats.fetch_flipflop++;
+       if (req->obj->objcore == NULL || req->obj->objcore->flags & OC_F_BUSY) {
+               /* Initiate a fetch of the body */
+               wrk->acct_tmp.fetch++;
+               AZ(wrk->busyobj->fetch_obj);
+               wrk->busyobj->fetch_obj = req->obj;
+               http_Setup(wrk->busyobj->bereq, NULL);
+               http_Setup(wrk->busyobj->beresp, NULL);
+               wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
+                                                  * thread grabbing
+                                                  * timeout */
+               if (wrk_ex != NULL)
+                       wrk->busyobj->use_locks = 1;
+               if (req->obj->objcore != NULL) {
+                       AN(req->obj->objcore->ban);
+                       HSH_Unbusy(wrk);
+                       AN(req->obj->objcore->busyobj);
+               }
+               if (wrk_ex != NULL) {
+                       /* Set up separate thread fetch */
+                       if (req->obj->objcore != NULL)
+                               /* Grab a ref on the objcore for the
+                                * other thread */
+                               HSH_Ref(req->obj->objcore);
+                       WRK_DoTask(wrk_ex, cnt_streambody_task,
+                                  VBO_RefBusyObj(wrk->busyobj));
+               } else {
+                       /* We have no fetch worker */
+                       if (req->obj->objcore == NULL ||
+                           req->obj->objcore->flags & OC_F_PASS) {
+                               /* It's a pass, prefer flipflop
+                                * streaming. (MBGXXX: Flipflop not
+                                * finished yet) */
+                               wrk->busyobj->do_stream_flipflop = 1;
+                               wrk->stats.fetch_flipflop++;
+                       }
+                       wrk->busyobj->fetch_failed =
+                               FetchBody(wrk, wrk->busyobj);
+                       VBO_StreamStopped(wrk->busyobj);
+                       if (req->obj->objcore != NULL) {
+                               HSH_RemoveBusyObj(wrk, req->obj->objcore);
+                               if (wrk->busyobj->fetch_failed == 0)
+                                       EXP_Insert(req->obj);
+                       }
+                       AZ(wrk->busyobj->fetch_obj);
+                       AZ(wrk->busyobj->vbc);
+                       wrk->busyobj->vfp = NULL;
+                       VBO_StreamSync(wrk);
                }
-               wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj);
-               VBO_StreamStopped(wrk->busyobj);
        }
 
-       RES_StreamBody(sp);
+       if (wrk->busyobj->do_stream_flipflop == 0)
+               RES_StreamBody(sp);
+       else
+               AN(wrk->sctx->stream_stopped);
 
-       if (wrk->busyobj->htc.ws == wrk->ws)
+       if (wrk->busyobj->htc.ws == wrk->ws) {
                /* Busyobj's htc has buffer on our workspace,
                   wait for it to be released */
+               AZ(wrk->busyobj->do_stream_flipflop);
                VBO_StreamWait(wrk->busyobj);
+       }
 
        if (wrk->busyobj->fetch_failed) {
                req->doclose = "Stream error";
-       } else if (req->obj->objcore != NULL) {
-               /* MBGXXX: This should be done on the bg task */
-               EXP_Insert(req->obj);
-               AN(req->obj->objcore);
-               AN(req->obj->objcore->ban);
-               HSH_Unbusy(wrk);
        }
        req->director = NULL;
        req->restarts = 0;
@@ -1169,7 +1191,6 @@ cnt_hit(struct sess *sp, struct worker *wrk, struct req 
*req)
 
        CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC);
        CHECK_OBJ_NOTNULL(req->vcl, VCL_CONF_MAGIC);
-       AZ(wrk->busyobj);
 
        assert(!(req->obj->objcore->flags & OC_F_PASS));
 
@@ -1315,6 +1336,8 @@ cnt_lookup(struct sess *sp, struct worker *wrk, struct 
req *req)
                wrk->stats.cache_hitpass++;
                WSP(sp, SLT_HitPass, "%u", req->obj->xid);
                (void)HSH_Deref(wrk, NULL, &req->obj);
+               if (wrk->busyobj != NULL)
+                       (void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
                req->objcore = NULL;
                sp->step = STP_PASS;
                return (0);
diff --git a/bin/varnishd/cache/cache_expire.c 
b/bin/varnishd/cache/cache_expire.c
index e93a1aa..212adf3 100644
--- a/bin/varnishd/cache/cache_expire.c
+++ b/bin/varnishd/cache/cache_expire.c
@@ -224,7 +224,6 @@ EXP_Insert(struct object *o)
        CHECK_OBJ_NOTNULL(o, OBJECT_MAGIC);
        oc = o->objcore;
        CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
-       AssertObjBusy(o);
        HSH_Ref(oc);
 
        assert(o->exp.entered != 0 && !isnan(o->exp.entered));
diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index d5ae4d5..0643389 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -507,8 +507,6 @@ FetchBody(struct worker *wrk, struct busyobj *bo)
        if (bo->vfp == NULL)
                bo->vfp = &vfp_nop;
 
-       AssertObjCorePassOrBusy(obj->objcore);
-
        AZ(bo->vgz_rx);
        AZ(VTAILQ_FIRST(&obj->store));
 
diff --git a/bin/varnishd/cache/cache_hash.c b/bin/varnishd/cache/cache_hash.c
index e78eb60..a12d43a 100644
--- a/bin/varnishd/cache/cache_hash.c
+++ b/bin/varnishd/cache/cache_hash.c
@@ -63,6 +63,8 @@
 
 static const struct hash_slinger *hash;
 
+static void hsh_rush(struct objhead *oh);
+
 /*---------------------------------------------------------------------*/
 /* Precreate an objhead and object for later use */
 void
@@ -414,6 +416,13 @@ HSH_Lookup(struct sess *sp, struct objhead **poh)
                if (o->hits < INT_MAX)
                        o->hits++;
                assert(oh->refcnt > 1);
+               if (oc->busyobj != NULL) {
+                       /* It's streamable */
+                       CHECK_OBJ_NOTNULL(oc->busyobj, BUSYOBJ_MAGIC);
+                       wrk->busyobj = VBO_RefBusyObj(oc->busyobj);
+                       if (oh->waitinglist != NULL)
+                               hsh_rush(oh);
+               }
                Lck_Unlock(&oh->mtx);
                assert(hash->deref(oh));
                *poh = oh;
@@ -623,7 +632,7 @@ HSH_Unbusy(struct worker *wrk)
        VTAILQ_REMOVE(&oh->objcs, oc, list);
        VTAILQ_INSERT_HEAD(&oh->objcs, oc, list);
        oc->flags &= ~OC_F_BUSY;
-       if (oc->busyobj != NULL)
+       if (oc->busyobj->do_stream == 0)
                (void)VBO_DerefBusyObj(wrk, &oc->busyobj);
        if (oh->waitinglist != NULL)
                hsh_rush(oh);
@@ -632,6 +641,28 @@ HSH_Unbusy(struct worker *wrk)
        assert(oc_getobj(wrk, oc) == o);
 }
 
+/*---------------------------------------------------------------------
+ * Drop the objcore's ref on the busyobj while holding the objhead mutex
+ */
+
+void
+HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc)
+{
+       struct objhead *oh;
+
+       CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+       CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
+       oh = oc->objhead;
+       CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC);
+
+       AZ(oc->flags & OC_F_BUSY);
+       AN(oc->busyobj);
+       assert(oc->busyobj->do_stream == 0 || oc->busyobj->stream_stopped == 1);
+       Lck_Lock(&oh->mtx);
+       (void)VBO_DerefBusyObj(wrk, &oc->busyobj);
+       Lck_Unlock(&oh->mtx);
+}
+
 void
 HSH_Ref(struct objcore *oc)
 {
diff --git a/bin/varnishd/hash/hash_slinger.h b/bin/varnishd/hash/hash_slinger.h
index b45e604..8affa42 100644
--- a/bin/varnishd/hash/hash_slinger.h
+++ b/bin/varnishd/hash/hash_slinger.h
@@ -55,6 +55,7 @@ void HSH_Prealloc(const struct sess *sp);
 void HSH_Cleanup(struct worker *w);
 struct objcore *HSH_Lookup(struct sess *sp, struct objhead **poh);
 void HSH_Unbusy(struct worker *wrk);
+void HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc);
 void HSH_Ref(struct objcore *o);
 void HSH_Drop(struct worker *wrk);
 void HSH_Init(const struct hash_slinger *slinger);
-- 
1.7.4.1


_______________________________________________
varnish-dev mailing list
[email protected]
https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev

Reply via email to