Deal with thundering horde problem by limiting the number of clients
allowed to wait for additional data on the stream to
stream_tokens. Only threads that acquire a token will be allowed to
wait on the condvar. Others will have to go on a queue. Tokens are
passed to queued clients when a token holding thread has sent some
data to the client.

The maximum wait time to get a token is controlled through the
stream_token_timeout run-time parameter. This to prevent a token
holding client that suddenly blocks from starving another client
waiting for a token.
---
 bin/varnishd/cache/cache.h         |    5 ++
 bin/varnishd/cache/cache_busyobj.c |   96 +++++++++++++++++++++++++++++++++++-
 bin/varnishd/cache/cache_center.c  |    4 ++
 bin/varnishd/cache/cache_vrt_var.c |   20 ++++++++
 bin/varnishd/common/params.h       |    2 +
 bin/varnishd/mgt/mgt_param.c       |   12 +++++
 include/tbl/vsc_f_main.h           |    3 +
 lib/libvcl/generate.py             |    6 ++
 8 files changed, 146 insertions(+), 2 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index a64a0ae..b02bea6 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -278,6 +278,9 @@ struct stream_ctx {
 
        /* Backend fetch has finished */
        unsigned                stream_stopped;
+
+       /* Are we currently holding a token from the busyobj */
+       unsigned                has_token;
 };
 
 /*--------------------------------------------------------------------*/
@@ -545,6 +548,7 @@ struct busyobj {
        volatile struct storage *stream_frontchunk;
        unsigned                stream_stopped;
        ssize_t                 stream_pass_bufsize;
+       unsigned                stream_tokens;
 };
 
 /* Object structure --------------------------------------------------*/
@@ -759,6 +763,7 @@ void VBO_StreamStopped(struct busyobj *busyobj);
 void VBO_StreamWait(struct busyobj *busyobj);
 void VBO_StreamData(struct busyobj *busyobj);
 void VBO_StreamSync(struct worker *wrk);
+void VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj);
 
 /* cache_center.c [CNT] */
 void CNT_Session(struct sess *sp);
diff --git a/bin/varnishd/cache/cache_busyobj.c 
b/bin/varnishd/cache/cache_busyobj.c
index 8b97d4a..2b6ebc0 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -43,6 +43,7 @@ struct vbo {
 #define VBO_MAGIC              0xde3d8223
        struct lock             mtx;
        pthread_cond_t          cond;
+       VTAILQ_HEAD(, worker)   token_wait_queue;
        unsigned                refcount;
        uint16_t                nhttp;
        struct busyobj          bo;
@@ -51,6 +52,9 @@ struct vbo {
 static struct lock vbo_mtx;
 static struct vbo *nvbo;
 
+static void vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj);
+static void vbo_release_token(struct worker *wrk, struct busyobj *busyobj);
+
 void
 VBO_Init(void)
 {
@@ -82,6 +86,7 @@ vbo_New(void)
        vbo->nhttp = nhttp;
        Lck_New(&vbo->mtx, lck_busyobj);
        AZ(pthread_cond_init(&vbo->cond, NULL));
+       VTAILQ_INIT(&vbo->token_wait_queue);
        return (vbo);
 }
 
@@ -145,6 +150,7 @@ VBO_GetBusyObj(struct worker *wrk)
        vbo->bo.beresp = HTTP_create(p, vbo->nhttp);
 
        vbo->bo.stream_pass_bufsize = cache_param->stream_pass_bufsize;
+       vbo->bo.stream_tokens = cache_param->stream_tokens;
 
        return (&vbo->bo);
 }
@@ -291,6 +297,11 @@ VBO_StreamSync(struct worker *wrk)
                Lck_Lock(&busyobj->vbo->mtx);
        assert(sctx->stream_max <= busyobj->stream_max);
 
+       if (sctx->has_token)
+               /* Release token to give other clients that has
+                * reached the end of data a chance */
+               vbo_release_token(wrk, busyobj);
+
        if (wrk->sp->req->obj->objcore == NULL ||
            (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
                /* Give notice to backend fetch that we are finished
@@ -303,8 +314,11 @@ VBO_StreamSync(struct worker *wrk)
        sctx->stream_stopped = busyobj->stream_stopped;
        sctx->stream_max = busyobj->stream_max;
 
-       if (busyobj->use_locks && sctx->stream_next == sctx->stream_max) {
-               while (!busyobj->stream_stopped &&
+       if (busyobj->use_locks && !sctx->stream_stopped &&
+           sctx->stream_next == sctx->stream_max) {
+               /* We've exhausted available data, wait for more */
+               vbo_acquire_token(wrk, busyobj);
+               while (sctx->has_token && !busyobj->stream_stopped &&
                       sctx->stream_max == busyobj->stream_max) {
                        Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
                                     NULL);
@@ -316,3 +330,81 @@ VBO_StreamSync(struct worker *wrk)
        if (busyobj->use_locks)
                Lck_Unlock(&busyobj->vbo->mtx);
 }
+
+/* Acquire a token for the worker from the busyobj. If none available,
+   wait for stream_token_timeout ms on the queue.
+   wrk->sctx->has_token will be true if succesful */
+static void
+vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj)
+{
+       struct timespec ts;
+
+       CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+       CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+       AZ(wrk->sctx->has_token);
+       CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+       AN(busyobj->use_locks);
+
+       if (busyobj->stream_tokens > 0) {
+               busyobj->stream_tokens--;
+               wrk->sctx->has_token = 1;
+               return;
+       }
+
+       AZ(clock_gettime(CLOCK_REALTIME, &ts));
+       ts.tv_sec += cache_param->stream_token_timeout / 1000;
+       ts.tv_nsec += (cache_param->stream_token_timeout % 1000) * 1000000;
+       if (ts.tv_nsec >= 1000000000) {
+               ts.tv_sec++;
+               ts.tv_nsec -= 1000000000;
+       }
+       VTAILQ_INSERT_TAIL(&busyobj->vbo->token_wait_queue, wrk, list);
+       Lck_CondWait(&wrk->cond, &busyobj->vbo->mtx, &ts);
+       if (wrk->sctx->has_token == 0) {
+               VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk, list);
+               wrk->stats.n_tokentimeout++;
+       }
+}
+
+/* Release our currently held token, passing it on to the first on the
+   queue if the queue is non-empty. Resets wrk->sctx->has_token */
+static void
+vbo_release_token(struct worker *wrk, struct busyobj *busyobj)
+{
+       struct worker *wrk_waiting;
+
+       CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+       CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+       AN(wrk->sctx->has_token);
+       CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+       AN(busyobj->use_locks);
+
+       wrk_waiting = VTAILQ_FIRST(&busyobj->vbo->token_wait_queue);
+       if (wrk_waiting != NULL) {
+               /* Transfer our token to the first on the waiting
+                  list, and wake it */
+               CHECK_OBJ_NOTNULL(wrk_waiting, WORKER_MAGIC);
+               CHECK_OBJ_NOTNULL(wrk_waiting->sctx, STREAM_CTX_MAGIC);
+               VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk_waiting,
+                             list);
+               AZ(wrk_waiting->sctx->has_token);
+               wrk_waiting->sctx->has_token = 1;
+               AZ(pthread_cond_signal(&wrk_waiting->cond));
+       } else {
+               busyobj->stream_tokens++;
+       }
+
+       wrk->sctx->has_token = 0;
+}
+
+void
+VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj)
+{
+       CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+
+       if (busyobj->use_locks == 0)
+               return;
+       Lck_Lock(&busyobj->vbo->mtx);
+       vbo_release_token(wrk, busyobj);
+       Lck_Unlock(&busyobj->vbo->mtx);
+}
diff --git a/bin/varnishd/cache/cache_center.c 
b/bin/varnishd/cache/cache_center.c
index 96e23fc..5daa949 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -1084,6 +1084,10 @@ cnt_streambody(struct sess *sp, struct worker *wrk, 
struct req *req)
        else
                AN(wrk->sctx->stream_stopped);
 
+       if (wrk->sctx->has_token)
+               VBO_ReleaseToken(wrk, wrk->busyobj);
+       AZ(wrk->sctx->has_token);
+
        if (wrk->busyobj->htc.ws == wrk->ws) {
                /* Busyobj's htc has buffer on our workspace,
                   wait for it to be released */
diff --git a/bin/varnishd/cache/cache_vrt_var.c 
b/bin/varnishd/cache/cache_vrt_var.c
index e655abf..82d77c6 100644
--- a/bin/varnishd/cache/cache_vrt_var.c
+++ b/bin/varnishd/cache/cache_vrt_var.c
@@ -320,6 +320,26 @@ VRT_l_beresp_stream_pass_bufsize(const struct sess *sp, 
double val)
                sp->wrk->busyobj->stream_pass_bufsize = 0;
 }
 
+int
+VRT_r_beresp_stream_tokens(const struct sess *sp)
+{
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC);
+       return (sp->wrk->busyobj->stream_tokens);
+}
+
+void
+VRT_l_beresp_stream_tokens(const struct sess *sp, int val)
+{
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC);
+       if (val >= 1)
+               sp->wrk->busyobj->stream_tokens = val;
+       else
+               sp->wrk->busyobj->stream_tokens = 1;
+}
+
+
 /*--------------------------------------------------------------------*/
 
 void
diff --git a/bin/varnishd/common/params.h b/bin/varnishd/common/params.h
index 20b346a..a68d608 100644
--- a/bin/varnishd/common/params.h
+++ b/bin/varnishd/common/params.h
@@ -101,6 +101,8 @@ struct params {
        ssize_t                 stream_maxchunksize;
        unsigned                stream_grab_timeout;
        ssize_t                 stream_pass_bufsize;
+       unsigned                stream_tokens;
+       unsigned                stream_token_timeout;
 
        unsigned                nuke_limit;
 
diff --git a/bin/varnishd/mgt/mgt_param.c b/bin/varnishd/mgt/mgt_param.c
index 3a64b96..e5d6888 100644
--- a/bin/varnishd/mgt/mgt_param.c
+++ b/bin/varnishd/mgt/mgt_param.c
@@ -858,6 +858,18 @@ static const struct parspec input_parspec[] = {
                "Zero means unlimited.\n",
                EXPERIMENTAL,
                "10mb", "bytes" },
+       { "stream_tokens",
+               tweak_uint, &mgt_param.stream_tokens, 1, UINT_MAX,
+               "Default number of tokens available for racing streaming "
+               "clients.\n",
+               EXPERIMENTAL,
+               "10", "tokens" },
+       { "stream_token_timeout",
+               tweak_uint, &mgt_param.stream_token_timeout, 1, UINT_MAX,
+               "Timeout for acquiring token during streaming and waiting "
+               "for more data.\n",
+               EXPERIMENTAL,
+               "100", "ms" },
 #ifdef SENDFILE_WORKS
        { "sendfile_threshold",
                tweak_bytes, &mgt_param.sendfile_threshold, 0, HUGE_VAL,
diff --git a/include/tbl/vsc_f_main.h b/include/tbl/vsc_f_main.h
index 5761c51..415b07d 100644
--- a/include/tbl/vsc_f_main.h
+++ b/include/tbl/vsc_f_main.h
@@ -262,6 +262,9 @@ VSC_F(n_objwrite,           uint64_t, 0, 'a', "Objects sent 
with write",
       "or if the sendfile call has been disabled")
 VSC_F(n_objoverflow,   uint64_t, 1, 'a',
                                        "Objects overflowing workspace", "")
+VSC_F(n_tokentimeout,  uint64_t, 1, 'a', "Token wait timeouts",
+      "The number of times a fast streaming client has timed out waiting for "
+      "token to be notified about new incoming data.")
 
 VSC_F(s_sess,                  uint64_t, 1, 'a', "Total Sessions", "")
 VSC_F(s_req,                   uint64_t, 1, 'a', "Total Requests", "")
diff --git a/lib/libvcl/generate.py b/lib/libvcl/generate.py
index 51b2294..1dab49d 100755
--- a/lib/libvcl/generate.py
+++ b/lib/libvcl/generate.py
@@ -337,6 +337,12 @@ sp_variables = (
                ( 'fetch',),
                'const struct sess *'
        ),
+       ('beresp.stream_tokens',
+               'INT',
+               ( 'fetch',),
+               ( 'fetch',),
+               'const struct sess *'
+       ),
        ('beresp.ttl',
                'DURATION',
                ( 'fetch',),
-- 
1.7.4.1


_______________________________________________
varnish-dev mailing list
[email protected]
https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev

Reply via email to