Hi Poul-Henning & varnish-dev, Please find attached a series of patches implementing the background fetch thread streaming. Multi-client streaming is coming later. Any comments appreciated.
Regards, Martin Blix Grydeland -- Martin Blix Grydeland Varnish Software AS
From 780842c7056e0e69bccfd7f5c1752d63c3dbfd74 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Tue, 15 Nov 2011 12:03:56 +0100 Subject: [PATCH 01/10] Add task scheduling bits --- bin/varnishd/cache/cache.h | 5 ++ bin/varnishd/cache/cache_pool.c | 87 ++++++++++++++++++++++++++++++----- bin/varnishd/cache/cache_session.c | 19 ++++++++ 3 files changed, 98 insertions(+), 13 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index 9601270..a54d903 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -114,6 +114,7 @@ struct vrt_backend; struct vsb; struct waitinglist; struct worker; +struct task; #define DIGEST_LEN 32 @@ -308,6 +309,7 @@ struct worker { pthread_cond_t cond; VTAILQ_ENTRY(worker) list; + struct task *tp; struct sess *sp; struct VCL_conf *vcl; @@ -870,6 +872,8 @@ void Pool_Init(void); void Pool_Work_Thread(void *priv, struct worker *w); void Pool_Wait(struct sess *sp); int Pool_Schedule(struct pool *pp, struct sess *sp); +typedef void taskfunc(struct worker *, void *priv); +void Pool_Schedule_Task(struct pool *pp, taskfunc *func, void *priv); #define WRW_IsReleased(w) ((w)->wrw.wfd == NULL) int WRW_Error(const struct worker *w); @@ -893,6 +897,7 @@ void SES_Charge(struct sess *sp); struct sesspool *SES_NewPool(struct pool *pp); void SES_DeletePool(struct sesspool *sp, struct worker *wrk); int SES_Schedule(struct sess *sp); +void SES_Schedule_Task(struct sess *sp, taskfunc *func, void *priv); /* cache_shmlog.c */ diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c index a3cb805..7bd0a2e 100644 --- a/bin/varnishd/cache/cache_pool.c +++ b/bin/varnishd/cache/cache_pool.c @@ -93,6 +93,14 @@ struct poolsock { struct listen_sock *lsock; }; +struct task { + unsigned magic; +#define TASK_MAGIC 0x1f6f53f5 + VTAILQ_ENTRY(task) list; + taskfunc *func; + void *priv; +}; + /* Number of work requests queued in excess of worker threads available */ struct pool { @@ -108,6 +116,7 @@ struct pool { struct workerhead idle; VTAILQ_HEAD(, sess) queue; VTAILQ_HEAD(, poolsock) socks; + VTAILQ_HEAD(, task) taskqueue; unsigned nthr; unsigned lqueue; unsigned last_lqueue; @@ -200,8 +209,14 @@ Pool_Work_Thread(void *priv, struct worker *w) WS_Reset(w->ws, NULL); - w->sp = VTAILQ_FIRST(&pp->queue); - if (w->sp != NULL) { + w->tp = VTAILQ_FIRST(&pp->taskqueue); + if (w->tp == NULL) + w->sp = VTAILQ_FIRST(&pp->queue); + if (w->tp != NULL) { + /* Process background task, if any */ + /* MBGXXX: Accounting number of tasks */ + VTAILQ_REMOVE(&pp->taskqueue, w->tp, list); + } else if (w->sp != NULL) { /* Process queued requests, if any */ assert(pp->lqueue > 0); VTAILQ_REMOVE(&pp->queue, w->sp, poollist); @@ -233,12 +248,12 @@ Pool_Work_Thread(void *priv, struct worker *w) * If we got neither session or accepted a socket, we were * woken up to die to cull the herd. */ - if (w->sp == NULL && w->ws->r == NULL) + if (w->tp == NULL && w->sp == NULL && w->ws->r == NULL) break; Lck_Unlock(&pp->mtx); - if (w->sp == NULL) { + if (w->tp == NULL && w->sp == NULL) { /* Turn accepted socket into a session */ assert(w->ws->r != NULL); w->sp = SES_New(w, pp->sesspool); @@ -250,19 +265,26 @@ Pool_Work_Thread(void *priv, struct worker *w) } assert(w->ws->r == NULL); - if (w->sp != NULL) { - CHECK_OBJ_NOTNULL(w->sp, SESS_MAGIC); - + if (w->tp != NULL || w->sp != NULL) { stats_clean = 0; w->lastused = NAN; w->storage_hint = NULL; - AZ(w->sp->wrk); - THR_SetSession(w->sp); - w->sp->wrk = w; - CNT_Session(w->sp); - THR_SetSession(NULL); - w->sp = NULL; + if (w->tp != NULL) { + AZ(w->sp); + CHECK_OBJ_NOTNULL(w->tp, TASK_MAGIC); + AN(w->tp->func); + (w->tp->func)(w, w->tp->priv); + FREE_OBJ(w->tp); + } else { + CHECK_OBJ_NOTNULL(w->sp, SESS_MAGIC); + AZ(w->sp->wrk); + THR_SetSession(w->sp); + w->sp->wrk = w; + CNT_Session(w->sp); + THR_SetSession(NULL); + w->sp = NULL; + } WS_Assert(w->ws); AZ(w->busyobj); @@ -320,6 +342,28 @@ pool_queue(struct pool *pp, struct sess *sp) return (0); } +static void +pool_queue_task(struct pool *pp, struct task *tp) +{ + struct worker *w; + + Lck_Lock(&pp->mtx); + + /* If there are idle threads, we tickle the first one into action */ + w = VTAILQ_FIRST(&pp->idle); + if (w != NULL) { + VTAILQ_REMOVE(&pp->idle, w, list); + Lck_Unlock(&pp->mtx); + w->tp = tp; + AZ(pthread_cond_signal(&w->cond)); + return; + } + + VTAILQ_INSERT_TAIL(&pp->taskqueue, tp, list); + Lck_Unlock(&pp->mtx); + AZ(pthread_cond_signal(&pp->herder_cond)); +} + /*--------------------------------------------------------------------*/ int @@ -349,6 +393,22 @@ Pool_Schedule(struct pool *pp, struct sess *sp) return (1); } +void +Pool_Schedule_Task(struct pool *pp, taskfunc *func, void *priv) +{ + struct task *tp; + + CHECK_OBJ_NOTNULL(pp, POOL_MAGIC); + AN(func); + + ALLOC_OBJ(tp, TASK_MAGIC); + AN(tp); + tp->func = func; + tp->priv = priv; + + pool_queue_task(pp, tp); +} + /*-------------------------------------------------------------------- * Wait for another request */ @@ -517,6 +577,7 @@ pool_mkpool(void) XXXAN(pp); Lck_New(&pp->mtx, lck_wq); + VTAILQ_INIT(&pp->taskqueue); VTAILQ_INIT(&pp->queue); VTAILQ_INIT(&pp->idle); VTAILQ_INIT(&pp->socks); diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c index 21d7f51..f0a4436 100644 --- a/bin/varnishd/cache/cache_session.c +++ b/bin/varnishd/cache/cache_session.c @@ -265,6 +265,25 @@ SES_Schedule(struct sess *sp) return (0); } +void +SES_Schedule_Task(struct sess *sp, taskfunc *func, void *priv) +{ + struct sessmem *sm; + struct sesspool *pp; + + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + + sm = sp->mem; + CHECK_OBJ_NOTNULL(sm, SESSMEM_MAGIC); + + pp = sm->pool; + CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC); + + AN(pp->pool); + + Pool_Schedule_Task(pp->pool, func, priv); +} + /*-------------------------------------------------------------------- * Handle a session (from waiter) * -- 1.7.4.1
From c50a997fdc1acf645181c55f66a6b28e7acbef9a Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Wed, 14 Dec 2011 13:34:25 +0100 Subject: [PATCH 02/10] Make FetchBody take a busyobj struct as parameter instead of obj, and prepare the busyobj->fetch_obj before calling FetchBody. --- bin/varnishd/cache/cache.h | 2 +- bin/varnishd/cache/cache_center.c | 10 ++++++++-- bin/varnishd/cache/cache_fetch.c | 7 +++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index a54d903..c265ce4 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -738,7 +738,7 @@ struct storage *FetchStorage(struct worker *w, ssize_t sz); int FetchError(struct worker *w, const char *error); int FetchError2(struct worker *w, const char *error, const char *more); int FetchHdr(struct sess *sp, int need_host_hdr); -int FetchBody(struct worker *w, struct object *obj); +int FetchBody(struct worker *w, struct busyobj *bo); int FetchReqBody(struct sess *sp); void Fetch_Init(void); diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c index 8826ae2..335e87a 100644 --- a/bin/varnishd/cache/cache_center.c +++ b/bin/varnishd/cache/cache_center.c @@ -885,7 +885,10 @@ cnt_fetchbody(struct sess *sp) } /* Use unmodified headers*/ - i = FetchBody(wrk, wrk->obj); + AZ(wrk->busyobj->fetch_obj); + wrk->busyobj->fetch_obj = wrk->obj; + i = FetchBody(wrk, wrk->busyobj); + AZ(wrk->busyobj->fetch_obj); http_Setup(wrk->busyobj->bereq, NULL); http_Setup(wrk->busyobj->beresp, NULL); @@ -955,7 +958,10 @@ cnt_streambody(struct sess *sp) AssertObjCorePassOrBusy(wrk->obj->objcore); - i = FetchBody(wrk, wrk->obj); + AZ(wrk->busyobj->fetch_obj); + wrk->busyobj->fetch_obj = wrk->obj; + i = FetchBody(wrk, wrk->busyobj); + AZ(wrk->busyobj->fetch_obj); http_Setup(wrk->busyobj->bereq, NULL); http_Setup(wrk->busyobj->beresp, NULL); diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c index 936dc30..dbfb44a 100644 --- a/bin/varnishd/cache/cache_fetch.c +++ b/bin/varnishd/cache/cache_fetch.c @@ -484,20 +484,19 @@ FetchHdr(struct sess *sp, int need_host_hdr) /*--------------------------------------------------------------------*/ int -FetchBody(struct worker *wrk, struct object *obj) +FetchBody(struct worker *wrk, struct busyobj *bo) { int cls; struct storage *st; int mklen; ssize_t cl; struct http_conn *htc; - struct busyobj *bo; + struct object *obj; CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); - bo = wrk->busyobj; CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); - AZ(bo->fetch_obj); CHECK_OBJ_NOTNULL(bo->vbc, VBC_MAGIC); + obj = bo->fetch_obj; CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC); CHECK_OBJ_NOTNULL(obj->http, HTTP_MAGIC); -- 1.7.4.1
From 4a6941aee0bbadfde4a48dbe2b7643ba94c18e72 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 12:26:07 +0100 Subject: [PATCH 03/10] Add a (configurable through parameter stream_maxchunksize) chunk size limit when streaming. --- bin/varnishd/cache/cache_fetch.c | 2 ++ bin/varnishd/common/params.h | 1 + bin/varnishd/mgt/mgt_param.c | 9 +++++++++ 3 files changed, 12 insertions(+), 0 deletions(-) diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c index dbfb44a..eddb71e 100644 --- a/bin/varnishd/cache/cache_fetch.c +++ b/bin/varnishd/cache/cache_fetch.c @@ -189,6 +189,8 @@ FetchStorage(struct worker *wrk, ssize_t sz) l = sz; if (l == 0) l = cache_param->fetch_chunksize; + if (wrk->busyobj->do_stream && l > cache_param->stream_maxchunksize) + l = cache_param->stream_maxchunksize; st = STV_alloc(wrk, l); if (st == NULL) { (void)FetchError(wrk, "Could not get storage"); diff --git a/bin/varnishd/common/params.h b/bin/varnishd/common/params.h index 8198a78..cc4a6b8 100644 --- a/bin/varnishd/common/params.h +++ b/bin/varnishd/common/params.h @@ -97,6 +97,7 @@ struct params { /* Fetcher hints */ ssize_t fetch_chunksize; ssize_t fetch_maxchunksize; + ssize_t stream_maxchunksize; unsigned nuke_limit; #ifdef SENDFILE_WORKS diff --git a/bin/varnishd/mgt/mgt_param.c b/bin/varnishd/mgt/mgt_param.c index 1935344..190de40 100644 --- a/bin/varnishd/mgt/mgt_param.c +++ b/bin/varnishd/mgt/mgt_param.c @@ -832,6 +832,15 @@ static const struct parspec input_parspec[] = { "fragmentation.\n", EXPERIMENTAL, "256m", "bytes" }, + { "stream_maxchunksize", + tweak_bytes_u, + &mgt_param.stream_maxchunksize, 4 * 1024, UINT_MAX, + "The maximum chunksize we attempt to allocate from storage " + "when streaming. This also defines the intervals at which " + "the streaming clients receive notifications about new " + "data available.\n", + EXPERIMENTAL, + "256k", "bytes" }, #ifdef SENDFILE_WORKS { "sendfile_threshold", tweak_bytes, &mgt_param.sendfile_threshold, 0, HUGE_VAL, -- 1.7.4.1
From 42c2432c2c3f6157172b09ff2a7bd67a5a40a38b Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 13:00:20 +0100 Subject: [PATCH 04/10] Don't free the object store when fetch fails and streaming is in effect, as there might be threads reading this data. --- bin/varnishd/cache/cache_fetch.c | 13 ++++++++----- 1 files changed, 8 insertions(+), 5 deletions(-) diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c index eddb71e..a8cd23e 100644 --- a/bin/varnishd/cache/cache_fetch.c +++ b/bin/varnishd/cache/cache_fetch.c @@ -579,11 +579,14 @@ FetchBody(struct worker *wrk, struct busyobj *bo) if (cls < 0) { wrk->stats.fetch_failed++; - /* XXX: Wouldn't this store automatically be released ? */ - while (!VTAILQ_EMPTY(&obj->store)) { - st = VTAILQ_FIRST(&obj->store); - VTAILQ_REMOVE(&obj->store, st, list); - STV_free(st); + if (bo->do_stream == 0) { + /* XXX: Wouldn't this store automatically be + * released ? */ + while (!VTAILQ_EMPTY(&obj->store)) { + st = VTAILQ_FIRST(&obj->store); + VTAILQ_REMOVE(&obj->store, st, list); + STV_free(st); + } } VDI_CloseFd(wrk, &bo->vbc); obj->len = 0; -- 1.7.4.1
From 5030bae7b097407c4ace94e07a738e7895c3b3b4 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 13:40:43 +0100 Subject: [PATCH 05/10] Add a condvar to struct vbo --- bin/varnishd/cache/cache_busyobj.c | 3 +++ 1 files changed, 3 insertions(+), 0 deletions(-) diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index ab4db76..9080aa3 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -42,6 +42,7 @@ struct vbo { unsigned magic; #define VBO_MAGIC 0xde3d8223 struct lock mtx; + pthread_cond_t cond; unsigned refcount; uint16_t nhttp; struct busyobj bo; @@ -80,6 +81,7 @@ vbo_New(void) vbo->magic = VBO_MAGIC; vbo->nhttp = nhttp; Lck_New(&vbo->mtx, lck_busyobj); + AZ(pthread_cond_init(&vbo->cond, NULL)); return (vbo); } @@ -94,6 +96,7 @@ VBO_Free(struct vbo **vbop) CHECK_OBJ_NOTNULL(vbo, VBO_MAGIC); AZ(vbo->refcount); Lck_Delete(&vbo->mtx); + AZ(pthread_cond_destroy(&vbo->cond)); FREE_OBJ(vbo); } -- 1.7.4.1
From ee4544fc8087f389a3affa98b28ade9a2f4441b4 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 15:04:56 +0100 Subject: [PATCH 06/10] Add stream data synchronization functions to cache_busyobj.c in preparation of threaded streaming. VBO_StreamData is called by the fetch to update the busyobj with how much data is available. VBO_StreamSync is called by the dilvery to update it's local stream_ctx struct with the new pointers. --- bin/varnishd/cache/cache.h | 10 ++++++++++ bin/varnishd/cache/cache_busyobj.c | 31 +++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 0 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index c265ce4..fc43010 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -273,6 +273,10 @@ struct stream_ctx { /* First byte of storage if we free it as we go (pass) */ ssize_t stream_front; + struct storage *stream_frontchunk; + + /* Max byte we can stream */ + ssize_t stream_max; }; /*--------------------------------------------------------------------*/ @@ -525,6 +529,10 @@ struct busyobj { unsigned do_gzip; unsigned do_gunzip; unsigned do_stream; + + /* Stream stuff */ + ssize_t stream_max; + struct storage *stream_frontchunk; }; /* Object structure --------------------------------------------------*/ @@ -703,6 +711,8 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk); void VBO_RefBusyObj(const struct busyobj *busyobj); void VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj); void VBO_Free(struct vbo **vbo); +void VBO_StreamData(struct busyobj *busyobj); +void VBO_StreamSync(struct worker *wrk); /* cache_center.c [CNT] */ void CNT_Session(struct sess *sp); diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index 9080aa3..988ae07 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -200,3 +200,34 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo) } } } + +/* Signal additional data available */ +void +VBO_StreamData(struct busyobj *busyobj) +{ + CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC); + CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC); + + assert(busyobj->fetch_obj->len >= busyobj->stream_max); + if (busyobj->fetch_obj->len > busyobj->stream_max) { + busyobj->stream_max = busyobj->fetch_obj->len; + } +} + +/* Sync the client's stream_ctx with the busyobj */ +void +VBO_StreamSync(struct worker *wrk) +{ + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); + CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + + wrk->sctx->stream_max = wrk->busyobj->stream_max; + if (wrk->obj->objcore == NULL || + (wrk->obj->objcore->flags & OC_F_PASS)) { + /* Give notice to backend fetch that we are finished + * with all chunks before this one */ + wrk->busyobj->stream_frontchunk = wrk->sctx->stream_frontchunk; + } +} -- 1.7.4.1
From 3ba48dcfb5f4231dfa3147d5d63d9dda9d993ad5 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 15:09:47 +0100 Subject: [PATCH 07/10] Rework RES_StreamPoll to use the VBO_StreamData and VBO_StreamSync functions, and make the object data access thread safe (given proper locking in VBO_StreamData and VBO_StreamSync that is coming in a later patch). --- bin/varnishd/cache/cache.h | 4 +- bin/varnishd/cache/cache_response.c | 83 +++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 34 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index fc43010..320f9ca 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -271,7 +271,7 @@ struct stream_ctx { /* Next byte we will take from storage */ ssize_t stream_next; - /* First byte of storage if we free it as we go (pass) */ + /* Point in storage chunk chain we have reached */ ssize_t stream_front; struct storage *stream_frontchunk; @@ -950,7 +950,7 @@ void RES_BuildHttp(const struct sess *sp); void RES_WriteObj(struct sess *sp); void RES_StreamStart(struct sess *sp); void RES_StreamEnd(struct sess *sp); -void RES_StreamPoll(struct worker *); +void RES_StreamPoll(struct worker *wrk); /* cache_vary.c */ struct vsb *VRY_Create(const struct sess *sp, const struct http *hp); diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c index 7c63d51..5038395 100644 --- a/bin/varnishd/cache/cache_response.c +++ b/bin/varnishd/cache/cache_response.c @@ -357,55 +357,74 @@ RES_StreamStart(struct sess *sp) } void -RES_StreamPoll(struct worker *w) +RES_StreamPoll(struct worker *wrk) { struct stream_ctx *sctx; struct storage *st; - ssize_t l, l2; + struct object *fetch_obj; + ssize_t l, l2, stlen; void *ptr; - CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); - CHECK_OBJ_NOTNULL(w->busyobj->fetch_obj, OBJECT_MAGIC); - sctx = w->sctx; + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); + sctx = wrk->sctx; CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); - if (w->busyobj->fetch_obj->len == sctx->stream_next) + + VBO_StreamData(wrk->busyobj); + VBO_StreamSync(wrk); + + if (sctx->stream_max == sctx->stream_next) return; - assert(w->busyobj->fetch_obj->len > sctx->stream_next); + assert(sctx->stream_max > sctx->stream_next); + l = sctx->stream_front; - VTAILQ_FOREACH(st, &w->busyobj->fetch_obj->store, list) { - if (st->len + l <= sctx->stream_next) { - l += st->len; + st = sctx->stream_frontchunk; + if (st == NULL) + st = VTAILQ_FIRST(&wrk->obj->store); + for (; st != NULL; st = VTAILQ_NEXT(st, list)) { + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + sctx->stream_front = l; + sctx->stream_frontchunk = st; + stlen = st->len; + if (l + stlen <= sctx->stream_next) { + l += stlen; continue; } - l2 = st->len + l - sctx->stream_next; + assert(l + stlen > sctx->stream_next); + l2 = l + stlen - sctx->stream_next; + if (sctx->stream_next + l2 > sctx->stream_max) + l2 = sctx->stream_max - sctx->stream_next; ptr = st->ptr + (sctx->stream_next - l); - if (w->res_mode & RES_GUNZIP) { - (void)VGZ_WrwGunzip(w, sctx->vgz, ptr, l2, + if (wrk->res_mode & RES_GUNZIP) { + (void)VGZ_WrwGunzip(wrk, sctx->vgz, ptr, l2, sctx->obuf, sctx->obuf_len, &sctx->obuf_ptr); } else { - (void)WRW_Write(w, ptr, l2); + (void)WRW_Write(wrk, ptr, l2); } - l += st->len; sctx->stream_next += l2; + if (sctx->stream_next == sctx->stream_max) + break; + AN(VTAILQ_NEXT(st, list)); + l += st->len; } - if (!(w->res_mode & RES_GUNZIP)) - (void)WRW_Flush(w); + if (!(wrk->res_mode & RES_GUNZIP)) + (void)WRW_Flush(wrk); - if (w->busyobj->fetch_obj->objcore == NULL || - (w->busyobj->fetch_obj->objcore->flags & OC_F_PASS)) { - /* - * This is a pass object, release storage as soon as we - * have delivered it. - */ - while (1) { - st = VTAILQ_FIRST(&w->busyobj->fetch_obj->store); - if (st == NULL || - sctx->stream_front + st->len > sctx->stream_next) - break; - VTAILQ_REMOVE(&w->busyobj->fetch_obj->store, st, list); - sctx->stream_front += st->len; - STV_free(st); - } + if (wrk->busyobj->stream_frontchunk == NULL) + return; + + /* It's a pass - remove chunks already delivered */ + fetch_obj = wrk->busyobj->fetch_obj; + CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC); + assert(fetch_obj->objcore == NULL || + (fetch_obj->objcore->flags & OC_F_PASS)); + while (1) { + st = VTAILQ_FIRST(&fetch_obj->store); + if (st == NULL || st == wrk->busyobj->stream_frontchunk) + break; + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + VTAILQ_REMOVE(&fetch_obj->store, st, list); + STV_free(st); } } -- 1.7.4.1
From 9b4eebe1dc52c64230c77bb179b91c0da158ab55 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 15:39:08 +0100 Subject: [PATCH 08/10] Add VBO_StreamStopped and VBO_StreamWait thread synchronization functions. --- bin/varnishd/cache/cache.h | 6 ++++++ bin/varnishd/cache/cache_busyobj.c | 21 +++++++++++++++++++++ 2 files changed, 27 insertions(+), 0 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index 320f9ca..c3314f1 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -277,6 +277,9 @@ struct stream_ctx { /* Max byte we can stream */ ssize_t stream_max; + + /* Backend fetch has finished */ + unsigned stream_stopped; }; /*--------------------------------------------------------------------*/ @@ -533,6 +536,7 @@ struct busyobj { /* Stream stuff */ ssize_t stream_max; struct storage *stream_frontchunk; + unsigned stream_stopped; }; /* Object structure --------------------------------------------------*/ @@ -711,6 +715,8 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk); void VBO_RefBusyObj(const struct busyobj *busyobj); void VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj); void VBO_Free(struct vbo **vbo); +void VBO_StreamStopped(struct busyobj *busyobj); +void VBO_StreamWait(struct busyobj *busyobj); void VBO_StreamData(struct busyobj *busyobj); void VBO_StreamSync(struct worker *wrk); diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index 988ae07..c8f61c6 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -201,6 +201,26 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo) } } +/* Signal that the fetch thread has stopped */ +void +VBO_StreamStopped(struct busyobj *busyobj) +{ + Lck_Lock(&busyobj->vbo->mtx); + busyobj->stream_stopped = 1; + AZ(pthread_cond_signal(&busyobj->vbo->cond)); + Lck_Unlock(&busyobj->vbo->mtx); +} + +/* Wait for the stopped state */ +void +VBO_StreamWait(struct busyobj *busyobj) +{ + Lck_Lock(&busyobj->vbo->mtx); + while (busyobj->stream_stopped == 0) + Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, NULL); + Lck_Unlock(&busyobj->vbo->mtx); +} + /* Signal additional data available */ void VBO_StreamData(struct busyobj *busyobj) @@ -223,6 +243,7 @@ VBO_StreamSync(struct worker *wrk) CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + wrk->sctx->stream_stopped = wrk->busyobj->stream_stopped; wrk->sctx->stream_max = wrk->busyobj->stream_max; if (wrk->obj->objcore == NULL || (wrk->obj->objcore->flags & OC_F_PASS)) { -- 1.7.4.1
From 1dbc27927e728123962ef88749cb7309167de7ef Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 15:47:59 +0100 Subject: [PATCH 09/10] Use background thread fetching when streaming --- bin/varnishd/cache/cache.h | 4 ++ bin/varnishd/cache/cache_busyobj.c | 13 +++++- bin/varnishd/cache/cache_center.c | 41 ++++------------ bin/varnishd/cache/cache_fetch.c | 37 ++++++++++++++ bin/varnishd/cache/cache_response.c | 92 ++++++++++++++++++++++++----------- 5 files changed, 127 insertions(+), 60 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index c3314f1..31993cf 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -755,6 +755,8 @@ int FetchError(struct worker *w, const char *error); int FetchError2(struct worker *w, const char *error, const char *more); int FetchHdr(struct sess *sp, int need_host_hdr); int FetchBody(struct worker *w, struct busyobj *bo); +void FetchBodyBackground(struct sess *sp, struct busyobj *bo); +void FetchBodyWait(struct busyobj *bo); int FetchReqBody(struct sess *sp); void Fetch_Init(void); @@ -955,6 +957,8 @@ void WSL_Flush(struct worker *w, int overflow); void RES_BuildHttp(const struct sess *sp); void RES_WriteObj(struct sess *sp); void RES_StreamStart(struct sess *sp); +void RES_StreamBody(struct sess *sp); +void RES_StreamWrite(struct sess *sp); void RES_StreamEnd(struct sess *sp); void RES_StreamPoll(struct worker *wrk); diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index c8f61c6..02a85be 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -228,13 +228,17 @@ VBO_StreamData(struct busyobj *busyobj) CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC); CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC); + Lck_Lock(&busyobj->vbo->mtx); assert(busyobj->fetch_obj->len >= busyobj->stream_max); if (busyobj->fetch_obj->len > busyobj->stream_max) { busyobj->stream_max = busyobj->fetch_obj->len; + AZ(pthread_cond_signal(&busyobj->vbo->cond)); } + Lck_Unlock(&busyobj->vbo->mtx); } -/* Sync the client's stream_ctx with the busyobj */ +/* Sync the client's stream_ctx with the busyobj, and block on no more + * data available */ void VBO_StreamSync(struct worker *wrk) { @@ -243,6 +247,12 @@ VBO_StreamSync(struct worker *wrk) CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + Lck_Lock(&wrk->busyobj->vbo->mtx); + while (!wrk->busyobj->stream_stopped && + wrk->sctx->stream_max == wrk->busyobj->stream_max) { + Lck_CondWait(&wrk->busyobj->vbo->cond, &wrk->busyobj->vbo->mtx, + NULL); + } wrk->sctx->stream_stopped = wrk->busyobj->stream_stopped; wrk->sctx->stream_max = wrk->busyobj->stream_max; if (wrk->obj->objcore == NULL || @@ -251,4 +261,5 @@ VBO_StreamSync(struct worker *wrk) * with all chunks before this one */ wrk->busyobj->stream_frontchunk = wrk->sctx->stream_frontchunk; } + Lck_Unlock(&wrk->busyobj->vbo->mtx); } diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c index 335e87a..c5e32b5 100644 --- a/bin/varnishd/cache/cache_center.c +++ b/bin/varnishd/cache/cache_center.c @@ -932,58 +932,37 @@ DOT streambody -> DONE [style=bold,color=cyan] static int cnt_streambody(struct sess *sp) { - int i; - struct stream_ctx sctx; - uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ? - cache_param->gzip_stack_buffer : 1]; struct worker *wrk; CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); wrk = sp->wrk; CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); - CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); - memset(&sctx, 0, sizeof sctx); - sctx.magic = STREAM_CTX_MAGIC; - AZ(wrk->sctx); - wrk->sctx = &sctx; - - if (wrk->res_mode & RES_GUNZIP) { - sctx.vgz = VGZ_NewUngzip(wrk, "U S -"); - sctx.obuf = obuf; - sctx.obuf_len = sizeof (obuf); - } - - RES_StreamStart(sp); AssertObjCorePassOrBusy(wrk->obj->objcore); + RES_StreamStart(sp); - AZ(wrk->busyobj->fetch_obj); wrk->busyobj->fetch_obj = wrk->obj; - i = FetchBody(wrk, wrk->busyobj); - AZ(wrk->busyobj->fetch_obj); + FetchBodyBackground(sp, wrk->busyobj); + RES_StreamBody(sp); - http_Setup(wrk->busyobj->bereq, NULL); - http_Setup(wrk->busyobj->beresp, NULL); - wrk->busyobj->vfp = NULL; - AZ(wrk->busyobj->vbc); - AN(sp->director); + VBO_StreamWait(wrk->busyobj); + AN(wrk->busyobj->stream_stopped); + AZ(wrk->busyobj->fetch_obj); - if (!i && wrk->obj->objcore != NULL) { + if (wrk->busyobj->fetch_failed) { + sp->doclose = "Stream error"; + } else if (wrk->obj->objcore != NULL) { EXP_Insert(wrk->obj); AN(wrk->obj->objcore); AN(wrk->obj->objcore->ban); HSH_Unbusy(wrk); - } else { - sp->doclose = "Stream error"; } - wrk->acct_tmp.fetch++; + sp->wrk->acct_tmp.fetch++; sp->director = NULL; sp->restarts = 0; RES_StreamEnd(sp); - if (wrk->res_mode & RES_GUNZIP) - (void)VGZ_Destroy(&sctx.vgz, sp->vsl_id); wrk->sctx = NULL; assert(WRW_IsReleased(wrk)); diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c index a8cd23e..fb5d72e 100644 --- a/bin/varnishd/cache/cache_fetch.c +++ b/bin/varnishd/cache/cache_fetch.c @@ -485,6 +485,43 @@ FetchHdr(struct sess *sp, int need_host_hdr) /*--------------------------------------------------------------------*/ +static void +fetch_body_task(struct worker *wrk, void *priv) +{ + struct busyobj *bo; + struct object *obj; + + CAST_OBJ_NOTNULL(bo, priv, BUSYOBJ_MAGIC); + + CHECK_OBJ_NOTNULL(bo->fetch_obj, OBJECT_MAGIC); + CHECK_OBJ_NOTNULL(bo->vbc, VBC_MAGIC); + + obj = bo->fetch_obj; + wrk->busyobj = bo; + + bo->fetch_failed = FetchBody(wrk, bo); + AZ(bo->fetch_obj); + AZ(bo->vbc); + http_Setup(bo->bereq, NULL); + http_Setup(bo->beresp, NULL); + bo->vfp = NULL; + wrk->busyobj = NULL; + AZ(wrk->obj); + + VBO_StreamStopped(bo); + VBO_DerefBusyObj(wrk, &bo); +} + +void +FetchBodyBackground(struct sess *sp, struct busyobj *bo) +{ + CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); + CHECK_OBJ_NOTNULL(bo->fetch_obj, OBJECT_MAGIC); + AssertObjCorePassOrBusy(bo->fetch_obj->objcore); + VBO_RefBusyObj(bo); + SES_Schedule_Task(sp, fetch_body_task, bo); +} + int FetchBody(struct worker *wrk, struct busyobj *bo) { diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c index 5038395..c6f14f2 100644 --- a/bin/varnishd/cache/cache_response.c +++ b/bin/varnishd/cache/cache_response.c @@ -329,11 +329,6 @@ RES_WriteObj(struct sess *sp) void RES_StreamStart(struct sess *sp) { - struct stream_ctx *sctx; - - sctx = sp->wrk->sctx; - CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); - AZ(sp->wrk->res_mode & RES_ESI_CHILD); AN(sp->wantbody); @@ -357,22 +352,57 @@ RES_StreamStart(struct sess *sp) } void -RES_StreamPoll(struct worker *wrk) +RES_StreamBody(struct sess *sp) +{ + struct stream_ctx sctx; + struct busyobj *bo; + uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ? + cache_param->gzip_stack_buffer : 1]; + + bo = sp->wrk->busyobj; + CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); + AN(sp->wantbody); + + memset(&sctx, 0, sizeof sctx); + sctx.magic = STREAM_CTX_MAGIC; + AZ(sp->wrk->sctx); + sp->wrk->sctx = &sctx; + + if (sp->wrk->res_mode & RES_GUNZIP) { + sctx.vgz = VGZ_NewUngzip(sp->wrk, "U S -"); + sctx.obuf = obuf; + sctx.obuf_len = sizeof (obuf); + } + + while (!sctx.stream_stopped || sctx.stream_next < sctx.stream_max) { + VBO_StreamSync(sp->wrk); + RES_StreamWrite(sp); + } + + if (sp->wrk->res_mode & RES_GUNZIP) { + if (sctx.obuf_ptr > 0) + (void)WRW_Write(sp->wrk, sctx.obuf, sctx.obuf_ptr); + VGZ_Destroy(&sctx.vgz, sp->vsl_id); + } + sp->wrk->sctx = NULL; +} + +void +RES_StreamWrite(struct sess *sp) { + struct worker *wrk; struct stream_ctx *sctx; struct storage *st; - struct object *fetch_obj; ssize_t l, l2, stlen; void *ptr; + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + wrk = sp->wrk; CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); sctx = wrk->sctx; CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); - VBO_StreamData(wrk->busyobj); - VBO_StreamSync(wrk); - if (sctx->stream_max == sctx->stream_next) return; assert(sctx->stream_max > sctx->stream_next); @@ -409,11 +439,34 @@ RES_StreamPoll(struct worker *wrk) } if (!(wrk->res_mode & RES_GUNZIP)) (void)WRW_Flush(wrk); +} + +void +RES_StreamEnd(struct sess *sp) +{ + if (sp->wrk->res_mode & RES_CHUNKED && + !(sp->wrk->res_mode & RES_ESI_CHILD)) + WRW_EndChunk(sp->wrk); + if (WRW_FlushRelease(sp->wrk)) + SES_Close(sp, "remote closed"); +} + +void +RES_StreamPoll(struct worker *wrk) +{ + struct object *fetch_obj; + struct storage *st; + + CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); + + VBO_StreamData(wrk->busyobj); if (wrk->busyobj->stream_frontchunk == NULL) return; - /* It's a pass - remove chunks already delivered */ + /* It's a pass - remove chunks already delivered. Should be OK + * to do lock-free, as we are not fiddling pointers of any + * storage chunk passed busyobj->stream_frontchunk */ fetch_obj = wrk->busyobj->fetch_obj; CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC); assert(fetch_obj->objcore == NULL || @@ -427,20 +480,3 @@ RES_StreamPoll(struct worker *wrk) STV_free(st); } } - -void -RES_StreamEnd(struct sess *sp) -{ - struct stream_ctx *sctx; - - sctx = sp->wrk->sctx; - CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); - - if (sp->wrk->res_mode & RES_GUNZIP && sctx->obuf_ptr > 0) - (void)WRW_Write(sp->wrk, sctx->obuf, sctx->obuf_ptr); - if (sp->wrk->res_mode & RES_CHUNKED && - !(sp->wrk->res_mode & RES_ESI_CHILD)) - WRW_EndChunk(sp->wrk); - if (WRW_FlushRelease(sp->wrk)) - SES_Close(sp, "remote closed"); -} -- 1.7.4.1
From 8d07e52a23fd42582ff1a6adf1c9335138b509e4 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland <[email protected]> Date: Thu, 15 Dec 2011 15:53:19 +0100 Subject: [PATCH 10/10] Add a couple of streaming test cases --- bin/varnishtest/tests/t00002.vtc | 38 ++++++++++++++++++++++++++++++++++++++ bin/varnishtest/tests/t00003.vtc | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 0 deletions(-) create mode 100644 bin/varnishtest/tests/t00002.vtc create mode 100644 bin/varnishtest/tests/t00003.vtc diff --git a/bin/varnishtest/tests/t00002.vtc b/bin/varnishtest/tests/t00002.vtc new file mode 100644 index 0000000..440577c --- /dev/null +++ b/bin/varnishtest/tests/t00002.vtc @@ -0,0 +1,38 @@ +varnishtest "Test streaming recepient in pass mode" + +server s1 { + rxreq + txresp -nolen -hdr "Transfer-encoding: chunked" + chunked "<1>------------------------<1>\n" + sema r1 sync 2 + chunked "<2>------------------------<2>\n" + sema r2 sync 2 + chunkedlen 0 +} -start + +varnish v1 -vcl+backend { + sub vcl_recv { + return (pass); + } + + sub vcl_fetch { + set beresp.do_stream = true; + } +} -start + +client c1 { + txreq -hdr "foo: /foo" + rxresp -no_obj + + rxchunk + expect resp.chunklen == 31 + sema r1 sync 2 + + rxchunk + expect resp.chunklen == 31 + sema r2 sync 2 + + rxchunk + expect resp.chunklen == 0 + expect resp.bodylen == 62 +} -run diff --git a/bin/varnishtest/tests/t00003.vtc b/bin/varnishtest/tests/t00003.vtc new file mode 100644 index 0000000..6681595 --- /dev/null +++ b/bin/varnishtest/tests/t00003.vtc @@ -0,0 +1,33 @@ +varnishtest "Test failed streamed delivery not entering cache" + +server s1 { + rxreq + txresp -nolen -hdr "Transfer-encoding: chunked" + chunked "<1>------------------------<1>\n" + chunked "<2>------------------------<2>\n" + accept + rxreq + txresp -nolen -hdr "Transfer-encoding: chunked" + chunked "<1>------------------------<1>\n" + chunked "<2>------------------------<2>\n" + chunked "<3>------------------------<3>\n" + chunkedlen 0 +} -start + +varnish v1 -vcl+backend { + sub vcl_fetch { + set beresp.do_stream = true; + } +} -start + +client c1 { + txreq + rxresp + delay 1 +} -run + +client c2 { + txreq + rxresp + expect resp.bodylen == 93 +} -run -- 1.7.4.1
_______________________________________________ varnish-dev mailing list [email protected] https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev
