Hi Poul-Henning & varnish-dev,

Please find attached a series of patches implementing the background fetch
thread streaming. Multi-client streaming is coming later. Any comments
appreciated.

Regards,
Martin Blix Grydeland

-- 
Martin Blix Grydeland
Varnish Software AS
From 780842c7056e0e69bccfd7f5c1752d63c3dbfd74 Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Tue, 15 Nov 2011 12:03:56 +0100
Subject: [PATCH 01/10] Add task scheduling bits

---
 bin/varnishd/cache/cache.h         |    5 ++
 bin/varnishd/cache/cache_pool.c    |   87 ++++++++++++++++++++++++++++++-----
 bin/varnishd/cache/cache_session.c |   19 ++++++++
 3 files changed, 98 insertions(+), 13 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 9601270..a54d903 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -114,6 +114,7 @@ struct vrt_backend;
 struct vsb;
 struct waitinglist;
 struct worker;
+struct task;
 
 #define DIGEST_LEN		32
 
@@ -308,6 +309,7 @@ struct worker {
 	pthread_cond_t		cond;
 
 	VTAILQ_ENTRY(worker)	list;
+	struct task		*tp;
 	struct sess		*sp;
 
 	struct VCL_conf		*vcl;
@@ -870,6 +872,8 @@ void Pool_Init(void);
 void Pool_Work_Thread(void *priv, struct worker *w);
 void Pool_Wait(struct sess *sp);
 int Pool_Schedule(struct pool *pp, struct sess *sp);
+typedef void taskfunc(struct worker *, void *priv);
+void Pool_Schedule_Task(struct pool *pp, taskfunc *func, void *priv);
 
 #define WRW_IsReleased(w)	((w)->wrw.wfd == NULL)
 int WRW_Error(const struct worker *w);
@@ -893,6 +897,7 @@ void SES_Charge(struct sess *sp);
 struct sesspool *SES_NewPool(struct pool *pp);
 void SES_DeletePool(struct sesspool *sp, struct worker *wrk);
 int SES_Schedule(struct sess *sp);
+void SES_Schedule_Task(struct sess *sp, taskfunc *func, void *priv);
 
 
 /* cache_shmlog.c */
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index a3cb805..7bd0a2e 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -93,6 +93,14 @@ struct poolsock {
 	struct listen_sock		*lsock;
 };
 
+struct task {
+	unsigned		magic;
+#define TASK_MAGIC		0x1f6f53f5
+	VTAILQ_ENTRY(task)	list;
+	taskfunc		*func;
+	void			*priv;
+};
+
 /* Number of work requests queued in excess of worker threads available */
 
 struct pool {
@@ -108,6 +116,7 @@ struct pool {
 	struct workerhead	idle;
 	VTAILQ_HEAD(, sess)	queue;
 	VTAILQ_HEAD(, poolsock)	socks;
+	VTAILQ_HEAD(, task)	taskqueue;
 	unsigned		nthr;
 	unsigned		lqueue;
 	unsigned		last_lqueue;
@@ -200,8 +209,14 @@ Pool_Work_Thread(void *priv, struct worker *w)
 
 		WS_Reset(w->ws, NULL);
 
-		w->sp = VTAILQ_FIRST(&pp->queue);
-		if (w->sp != NULL) {
+		w->tp = VTAILQ_FIRST(&pp->taskqueue);
+		if (w->tp == NULL)
+			w->sp = VTAILQ_FIRST(&pp->queue);
+		if (w->tp != NULL) {
+			/* Process background task, if any */
+			/* MBGXXX: Accounting number of tasks */
+			VTAILQ_REMOVE(&pp->taskqueue, w->tp, list);
+		} else if (w->sp != NULL) {
 			/* Process queued requests, if any */
 			assert(pp->lqueue > 0);
 			VTAILQ_REMOVE(&pp->queue, w->sp, poollist);
@@ -233,12 +248,12 @@ Pool_Work_Thread(void *priv, struct worker *w)
 		 * If we got neither session or accepted a socket, we were
 		 * woken up to die to cull the herd.
 		 */
-		if (w->sp == NULL && w->ws->r == NULL)
+		if (w->tp == NULL && w->sp == NULL && w->ws->r == NULL)
 			break;
 
 		Lck_Unlock(&pp->mtx);
 
-		if (w->sp == NULL) {
+		if (w->tp == NULL && w->sp == NULL) {
 			/* Turn accepted socket into a session */
 			assert(w->ws->r != NULL);
 			w->sp = SES_New(w, pp->sesspool);
@@ -250,19 +265,26 @@ Pool_Work_Thread(void *priv, struct worker *w)
 		}
 		assert(w->ws->r == NULL);
 
-		if (w->sp != NULL) {
-			CHECK_OBJ_NOTNULL(w->sp, SESS_MAGIC);
-
+		if (w->tp != NULL || w->sp != NULL) {
 			stats_clean = 0;
 			w->lastused = NAN;
 			w->storage_hint = NULL;
 
-			AZ(w->sp->wrk);
-			THR_SetSession(w->sp);
-			w->sp->wrk = w;
-			CNT_Session(w->sp);
-			THR_SetSession(NULL);
-			w->sp = NULL;
+			if (w->tp != NULL) {
+				AZ(w->sp);
+				CHECK_OBJ_NOTNULL(w->tp, TASK_MAGIC);
+				AN(w->tp->func);
+				(w->tp->func)(w, w->tp->priv);
+				FREE_OBJ(w->tp);
+			} else {
+				CHECK_OBJ_NOTNULL(w->sp, SESS_MAGIC);
+				AZ(w->sp->wrk);
+				THR_SetSession(w->sp);
+				w->sp->wrk = w;
+				CNT_Session(w->sp);
+				THR_SetSession(NULL);
+				w->sp = NULL;
+			}
 
 			WS_Assert(w->ws);
 			AZ(w->busyobj);
@@ -320,6 +342,28 @@ pool_queue(struct pool *pp, struct sess *sp)
 	return (0);
 }
 
+static void
+pool_queue_task(struct pool *pp, struct task *tp)
+{
+	struct worker *w;
+
+	Lck_Lock(&pp->mtx);
+
+	/* If there are idle threads, we tickle the first one into action */
+	w = VTAILQ_FIRST(&pp->idle);
+	if (w != NULL) {
+		VTAILQ_REMOVE(&pp->idle, w, list);
+		Lck_Unlock(&pp->mtx);
+		w->tp = tp;
+		AZ(pthread_cond_signal(&w->cond));
+		return;
+	}
+
+	VTAILQ_INSERT_TAIL(&pp->taskqueue, tp, list);
+	Lck_Unlock(&pp->mtx);
+	AZ(pthread_cond_signal(&pp->herder_cond));
+}
+
 /*--------------------------------------------------------------------*/
 
 int
@@ -349,6 +393,22 @@ Pool_Schedule(struct pool *pp, struct sess *sp)
 	return (1);
 }
 
+void
+Pool_Schedule_Task(struct pool *pp, taskfunc *func, void *priv)
+{
+	struct task *tp;
+
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+	AN(func);
+
+	ALLOC_OBJ(tp, TASK_MAGIC);
+	AN(tp);
+	tp->func = func;
+	tp->priv = priv;
+
+	pool_queue_task(pp, tp);
+}
+
 /*--------------------------------------------------------------------
  * Wait for another request
  */
@@ -517,6 +577,7 @@ pool_mkpool(void)
 	XXXAN(pp);
 	Lck_New(&pp->mtx, lck_wq);
 
+	VTAILQ_INIT(&pp->taskqueue);
 	VTAILQ_INIT(&pp->queue);
 	VTAILQ_INIT(&pp->idle);
 	VTAILQ_INIT(&pp->socks);
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 21d7f51..f0a4436 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -265,6 +265,25 @@ SES_Schedule(struct sess *sp)
 	return (0);
 }
 
+void
+SES_Schedule_Task(struct sess *sp, taskfunc *func, void *priv)
+{
+	struct sessmem *sm;
+	struct sesspool *pp;
+
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+
+	sm = sp->mem;
+	CHECK_OBJ_NOTNULL(sm, SESSMEM_MAGIC);
+
+	pp = sm->pool;
+	CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
+
+	AN(pp->pool);
+
+	Pool_Schedule_Task(pp->pool, func, priv);
+}
+
 /*--------------------------------------------------------------------
  * Handle a session (from waiter)
  *
-- 
1.7.4.1

From c50a997fdc1acf645181c55f66a6b28e7acbef9a Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Wed, 14 Dec 2011 13:34:25 +0100
Subject: [PATCH 02/10] Make FetchBody take a busyobj struct as parameter instead of obj, and
 prepare the busyobj->fetch_obj before calling FetchBody.

---
 bin/varnishd/cache/cache.h        |    2 +-
 bin/varnishd/cache/cache_center.c |   10 ++++++++--
 bin/varnishd/cache/cache_fetch.c  |    7 +++----
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index a54d903..c265ce4 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -738,7 +738,7 @@ struct storage *FetchStorage(struct worker *w, ssize_t sz);
 int FetchError(struct worker *w, const char *error);
 int FetchError2(struct worker *w, const char *error, const char *more);
 int FetchHdr(struct sess *sp, int need_host_hdr);
-int FetchBody(struct worker *w, struct object *obj);
+int FetchBody(struct worker *w, struct busyobj *bo);
 int FetchReqBody(struct sess *sp);
 void Fetch_Init(void);
 
diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 8826ae2..335e87a 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -885,7 +885,10 @@ cnt_fetchbody(struct sess *sp)
 	}
 
 	/* Use unmodified headers*/
-	i = FetchBody(wrk, wrk->obj);
+	AZ(wrk->busyobj->fetch_obj);
+	wrk->busyobj->fetch_obj = wrk->obj;
+	i = FetchBody(wrk, wrk->busyobj);
+	AZ(wrk->busyobj->fetch_obj);
 
 	http_Setup(wrk->busyobj->bereq, NULL);
 	http_Setup(wrk->busyobj->beresp, NULL);
@@ -955,7 +958,10 @@ cnt_streambody(struct sess *sp)
 
 	AssertObjCorePassOrBusy(wrk->obj->objcore);
 
-	i = FetchBody(wrk, wrk->obj);
+	AZ(wrk->busyobj->fetch_obj);
+	wrk->busyobj->fetch_obj = wrk->obj;
+	i = FetchBody(wrk, wrk->busyobj);
+	AZ(wrk->busyobj->fetch_obj);
 
 	http_Setup(wrk->busyobj->bereq, NULL);
 	http_Setup(wrk->busyobj->beresp, NULL);
diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index 936dc30..dbfb44a 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -484,20 +484,19 @@ FetchHdr(struct sess *sp, int need_host_hdr)
 /*--------------------------------------------------------------------*/
 
 int
-FetchBody(struct worker *wrk, struct object *obj)
+FetchBody(struct worker *wrk, struct busyobj *bo)
 {
 	int cls;
 	struct storage *st;
 	int mklen;
 	ssize_t cl;
 	struct http_conn *htc;
-	struct busyobj *bo;
+	struct object *obj;
 
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
-	bo = wrk->busyobj;
 	CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
-	AZ(bo->fetch_obj);
 	CHECK_OBJ_NOTNULL(bo->vbc, VBC_MAGIC);
+	obj = bo->fetch_obj;
 	CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC);
 	CHECK_OBJ_NOTNULL(obj->http, HTTP_MAGIC);
 
-- 
1.7.4.1

From 4a6941aee0bbadfde4a48dbe2b7643ba94c18e72 Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 12:26:07 +0100
Subject: [PATCH 03/10] Add a (configurable through parameter stream_maxchunksize) chunk size limit when streaming.

---
 bin/varnishd/cache/cache_fetch.c |    2 ++
 bin/varnishd/common/params.h     |    1 +
 bin/varnishd/mgt/mgt_param.c     |    9 +++++++++
 3 files changed, 12 insertions(+), 0 deletions(-)

diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index dbfb44a..eddb71e 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -189,6 +189,8 @@ FetchStorage(struct worker *wrk, ssize_t sz)
 		l = sz;
 	if (l == 0)
 		l = cache_param->fetch_chunksize;
+	if (wrk->busyobj->do_stream && l > cache_param->stream_maxchunksize)
+		l = cache_param->stream_maxchunksize;
 	st = STV_alloc(wrk, l);
 	if (st == NULL) {
 		(void)FetchError(wrk, "Could not get storage");
diff --git a/bin/varnishd/common/params.h b/bin/varnishd/common/params.h
index 8198a78..cc4a6b8 100644
--- a/bin/varnishd/common/params.h
+++ b/bin/varnishd/common/params.h
@@ -97,6 +97,7 @@ struct params {
 	/* Fetcher hints */
 	ssize_t			fetch_chunksize;
 	ssize_t			fetch_maxchunksize;
+	ssize_t			stream_maxchunksize;
 	unsigned		nuke_limit;
 
 #ifdef SENDFILE_WORKS
diff --git a/bin/varnishd/mgt/mgt_param.c b/bin/varnishd/mgt/mgt_param.c
index 1935344..190de40 100644
--- a/bin/varnishd/mgt/mgt_param.c
+++ b/bin/varnishd/mgt/mgt_param.c
@@ -832,6 +832,15 @@ static const struct parspec input_parspec[] = {
 		"fragmentation.\n",
 		EXPERIMENTAL,
 		"256m", "bytes" },
+	{ "stream_maxchunksize",
+		tweak_bytes_u,
+		    &mgt_param.stream_maxchunksize, 4 * 1024, UINT_MAX,
+		"The maximum chunksize we attempt to allocate from storage "
+		"when streaming. This also defines the intervals at which "
+		"the streaming clients receive notifications about new "
+		"data available.\n",
+		EXPERIMENTAL,
+		"256k", "bytes" },
 #ifdef SENDFILE_WORKS
 	{ "sendfile_threshold",
 		tweak_bytes, &mgt_param.sendfile_threshold, 0, HUGE_VAL,
-- 
1.7.4.1

From 42c2432c2c3f6157172b09ff2a7bd67a5a40a38b Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 13:00:20 +0100
Subject: [PATCH 04/10] Don't free the object store when fetch fails and streaming is in
 effect, as there might be threads reading this data.

---
 bin/varnishd/cache/cache_fetch.c |   13 ++++++++-----
 1 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index eddb71e..a8cd23e 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -579,11 +579,14 @@ FetchBody(struct worker *wrk, struct busyobj *bo)
 
 	if (cls < 0) {
 		wrk->stats.fetch_failed++;
-		/* XXX: Wouldn't this store automatically be released ? */
-		while (!VTAILQ_EMPTY(&obj->store)) {
-			st = VTAILQ_FIRST(&obj->store);
-			VTAILQ_REMOVE(&obj->store, st, list);
-			STV_free(st);
+		if (bo->do_stream == 0) {
+			/* XXX: Wouldn't this store automatically be
+			 * released ? */
+			while (!VTAILQ_EMPTY(&obj->store)) {
+				st = VTAILQ_FIRST(&obj->store);
+				VTAILQ_REMOVE(&obj->store, st, list);
+				STV_free(st);
+			}
 		}
 		VDI_CloseFd(wrk, &bo->vbc);
 		obj->len = 0;
-- 
1.7.4.1

From 5030bae7b097407c4ace94e07a738e7895c3b3b4 Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 13:40:43 +0100
Subject: [PATCH 05/10] Add a condvar to struct vbo

---
 bin/varnishd/cache/cache_busyobj.c |    3 +++
 1 files changed, 3 insertions(+), 0 deletions(-)

diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index ab4db76..9080aa3 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -42,6 +42,7 @@ struct vbo {
 	unsigned		magic;
 #define VBO_MAGIC		0xde3d8223
 	struct lock		mtx;
+	pthread_cond_t		cond;
 	unsigned		refcount;
 	uint16_t		nhttp;
 	struct busyobj		bo;
@@ -80,6 +81,7 @@ vbo_New(void)
 	vbo->magic = VBO_MAGIC;
 	vbo->nhttp = nhttp;
 	Lck_New(&vbo->mtx, lck_busyobj);
+	AZ(pthread_cond_init(&vbo->cond, NULL));
 	return (vbo);
 }
 
@@ -94,6 +96,7 @@ VBO_Free(struct vbo **vbop)
 	CHECK_OBJ_NOTNULL(vbo, VBO_MAGIC);
 	AZ(vbo->refcount);
 	Lck_Delete(&vbo->mtx);
+	AZ(pthread_cond_destroy(&vbo->cond));
 	FREE_OBJ(vbo);
 }
 
-- 
1.7.4.1

From ee4544fc8087f389a3affa98b28ade9a2f4441b4 Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 15:04:56 +0100
Subject: [PATCH 06/10] Add stream data synchronization functions to cache_busyobj.c in
 preparation of threaded streaming.

VBO_StreamData is called by the fetch to update the busyobj with how
much data is available.

VBO_StreamSync is called by the dilvery to update it's local
stream_ctx struct with the new pointers.
---
 bin/varnishd/cache/cache.h         |   10 ++++++++++
 bin/varnishd/cache/cache_busyobj.c |   31 +++++++++++++++++++++++++++++++
 2 files changed, 41 insertions(+), 0 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index c265ce4..fc43010 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -273,6 +273,10 @@ struct stream_ctx {
 
 	/* First byte of storage if we free it as we go (pass) */
 	ssize_t			stream_front;
+	struct storage		*stream_frontchunk;
+
+	/* Max byte we can stream */
+	ssize_t			stream_max;
 };
 
 /*--------------------------------------------------------------------*/
@@ -525,6 +529,10 @@ struct busyobj {
 	unsigned		do_gzip;
 	unsigned		do_gunzip;
 	unsigned		do_stream;
+
+	/* Stream stuff */
+	ssize_t			stream_max;
+	struct storage		*stream_frontchunk;
 };
 
 /* Object structure --------------------------------------------------*/
@@ -703,6 +711,8 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk);
 void VBO_RefBusyObj(const struct busyobj *busyobj);
 void VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj);
 void VBO_Free(struct vbo **vbo);
+void VBO_StreamData(struct busyobj *busyobj);
+void VBO_StreamSync(struct worker *wrk);
 
 /* cache_center.c [CNT] */
 void CNT_Session(struct sess *sp);
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index 9080aa3..988ae07 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -200,3 +200,34 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
 		}
 	}
 }
+
+/* Signal additional data available */
+void
+VBO_StreamData(struct busyobj *busyobj)
+{
+	CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+	CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC);
+
+	assert(busyobj->fetch_obj->len >= busyobj->stream_max);
+	if (busyobj->fetch_obj->len > busyobj->stream_max) {
+		busyobj->stream_max = busyobj->fetch_obj->len;
+	}
+}
+
+/* Sync the client's stream_ctx with the busyobj */
+void
+VBO_StreamSync(struct worker *wrk)
+{
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+
+	wrk->sctx->stream_max = wrk->busyobj->stream_max;
+	if (wrk->obj->objcore == NULL ||
+	    (wrk->obj->objcore->flags & OC_F_PASS)) {
+		/* Give notice to backend fetch that we are finished
+		 * with all chunks before this one */
+		wrk->busyobj->stream_frontchunk = wrk->sctx->stream_frontchunk;
+	}
+}
-- 
1.7.4.1

From 3ba48dcfb5f4231dfa3147d5d63d9dda9d993ad5 Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 15:09:47 +0100
Subject: [PATCH 07/10] Rework RES_StreamPoll to use the VBO_StreamData and VBO_StreamSync
 functions, and make the object data access thread safe (given proper
 locking in VBO_StreamData and VBO_StreamSync that is coming in a later
 patch).

---
 bin/varnishd/cache/cache.h          |    4 +-
 bin/varnishd/cache/cache_response.c |   83 +++++++++++++++++++++-------------
 2 files changed, 53 insertions(+), 34 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index fc43010..320f9ca 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -271,7 +271,7 @@ struct stream_ctx {
 	/* Next byte we will take from storage */
 	ssize_t			stream_next;
 
-	/* First byte of storage if we free it as we go (pass) */
+	/* Point in storage chunk chain we have reached */
 	ssize_t			stream_front;
 	struct storage		*stream_frontchunk;
 
@@ -950,7 +950,7 @@ void RES_BuildHttp(const struct sess *sp);
 void RES_WriteObj(struct sess *sp);
 void RES_StreamStart(struct sess *sp);
 void RES_StreamEnd(struct sess *sp);
-void RES_StreamPoll(struct worker *);
+void RES_StreamPoll(struct worker *wrk);
 
 /* cache_vary.c */
 struct vsb *VRY_Create(const struct sess *sp, const struct http *hp);
diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c
index 7c63d51..5038395 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -357,55 +357,74 @@ RES_StreamStart(struct sess *sp)
 }
 
 void
-RES_StreamPoll(struct worker *w)
+RES_StreamPoll(struct worker *wrk)
 {
 	struct stream_ctx *sctx;
 	struct storage *st;
-	ssize_t l, l2;
+	struct object *fetch_obj;
+	ssize_t l, l2, stlen;
 	void *ptr;
 
-	CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
-	CHECK_OBJ_NOTNULL(w->busyobj->fetch_obj, OBJECT_MAGIC);
-	sctx = w->sctx;
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC);
+	sctx = wrk->sctx;
 	CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
-	if (w->busyobj->fetch_obj->len == sctx->stream_next)
+
+	VBO_StreamData(wrk->busyobj);
+	VBO_StreamSync(wrk);
+
+	if (sctx->stream_max == sctx->stream_next)
 		return;
-	assert(w->busyobj->fetch_obj->len > sctx->stream_next);
+	assert(sctx->stream_max > sctx->stream_next);
+
 	l = sctx->stream_front;
-	VTAILQ_FOREACH(st, &w->busyobj->fetch_obj->store, list) {
-		if (st->len + l <= sctx->stream_next) {
-			l += st->len;
+	st = sctx->stream_frontchunk;
+	if (st == NULL)
+		st = VTAILQ_FIRST(&wrk->obj->store);
+	for (; st != NULL; st = VTAILQ_NEXT(st, list)) {
+		CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+		sctx->stream_front = l;
+		sctx->stream_frontchunk = st;
+		stlen = st->len;
+		if (l + stlen <= sctx->stream_next) {
+			l += stlen;
 			continue;
 		}
-		l2 = st->len + l - sctx->stream_next;
+		assert(l + stlen > sctx->stream_next);
+		l2 = l + stlen - sctx->stream_next;
+		if (sctx->stream_next + l2 > sctx->stream_max)
+			l2 = sctx->stream_max - sctx->stream_next;
 		ptr = st->ptr + (sctx->stream_next - l);
-		if (w->res_mode & RES_GUNZIP) {
-			(void)VGZ_WrwGunzip(w, sctx->vgz, ptr, l2,
+		if (wrk->res_mode & RES_GUNZIP) {
+			(void)VGZ_WrwGunzip(wrk, sctx->vgz, ptr, l2,
 			    sctx->obuf, sctx->obuf_len, &sctx->obuf_ptr);
 		} else {
-			(void)WRW_Write(w, ptr, l2);
+			(void)WRW_Write(wrk, ptr, l2);
 		}
-		l += st->len;
 		sctx->stream_next += l2;
+		if (sctx->stream_next == sctx->stream_max)
+			break;
+		AN(VTAILQ_NEXT(st, list));
+		l += st->len;
 	}
-	if (!(w->res_mode & RES_GUNZIP))
-		(void)WRW_Flush(w);
+	if (!(wrk->res_mode & RES_GUNZIP))
+		(void)WRW_Flush(wrk);
 
-	if (w->busyobj->fetch_obj->objcore == NULL ||
-	    (w->busyobj->fetch_obj->objcore->flags & OC_F_PASS)) {
-		/*
-		 * This is a pass object, release storage as soon as we
-		 * have delivered it.
-		 */
-		while (1) {
-			st = VTAILQ_FIRST(&w->busyobj->fetch_obj->store);
-			if (st == NULL ||
-			    sctx->stream_front + st->len > sctx->stream_next)
-				break;
-			VTAILQ_REMOVE(&w->busyobj->fetch_obj->store, st, list);
-			sctx->stream_front += st->len;
-			STV_free(st);
-		}
+	if (wrk->busyobj->stream_frontchunk == NULL)
+		return;
+
+	/* It's a pass - remove chunks already delivered */
+	fetch_obj = wrk->busyobj->fetch_obj;
+	CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
+	assert(fetch_obj->objcore == NULL ||
+	       (fetch_obj->objcore->flags & OC_F_PASS));
+	while (1) {
+		st = VTAILQ_FIRST(&fetch_obj->store);
+		if (st == NULL || st == wrk->busyobj->stream_frontchunk)
+			break;
+		CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+		VTAILQ_REMOVE(&fetch_obj->store, st, list);
+		STV_free(st);
 	}
 }
 
-- 
1.7.4.1

From 9b4eebe1dc52c64230c77bb179b91c0da158ab55 Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 15:39:08 +0100
Subject: [PATCH 08/10] Add VBO_StreamStopped and VBO_StreamWait thread synchronization functions.

---
 bin/varnishd/cache/cache.h         |    6 ++++++
 bin/varnishd/cache/cache_busyobj.c |   21 +++++++++++++++++++++
 2 files changed, 27 insertions(+), 0 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 320f9ca..c3314f1 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -277,6 +277,9 @@ struct stream_ctx {
 
 	/* Max byte we can stream */
 	ssize_t			stream_max;
+
+	/* Backend fetch has finished */
+	unsigned		stream_stopped;
 };
 
 /*--------------------------------------------------------------------*/
@@ -533,6 +536,7 @@ struct busyobj {
 	/* Stream stuff */
 	ssize_t			stream_max;
 	struct storage		*stream_frontchunk;
+	unsigned		stream_stopped;
 };
 
 /* Object structure --------------------------------------------------*/
@@ -711,6 +715,8 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk);
 void VBO_RefBusyObj(const struct busyobj *busyobj);
 void VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj);
 void VBO_Free(struct vbo **vbo);
+void VBO_StreamStopped(struct busyobj *busyobj);
+void VBO_StreamWait(struct busyobj *busyobj);
 void VBO_StreamData(struct busyobj *busyobj);
 void VBO_StreamSync(struct worker *wrk);
 
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index 988ae07..c8f61c6 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -201,6 +201,26 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
 	}
 }
 
+/* Signal that the fetch thread has stopped */
+void
+VBO_StreamStopped(struct busyobj *busyobj)
+{
+	Lck_Lock(&busyobj->vbo->mtx);
+	busyobj->stream_stopped = 1;
+	AZ(pthread_cond_signal(&busyobj->vbo->cond));
+	Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Wait for the stopped state */
+void
+VBO_StreamWait(struct busyobj *busyobj)
+{
+	Lck_Lock(&busyobj->vbo->mtx);
+	while (busyobj->stream_stopped == 0)
+		Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, NULL);
+	Lck_Unlock(&busyobj->vbo->mtx);
+}
+
 /* Signal additional data available */
 void
 VBO_StreamData(struct busyobj *busyobj)
@@ -223,6 +243,7 @@ VBO_StreamSync(struct worker *wrk)
 	CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC);
 	CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
 
+	wrk->sctx->stream_stopped = wrk->busyobj->stream_stopped;
 	wrk->sctx->stream_max = wrk->busyobj->stream_max;
 	if (wrk->obj->objcore == NULL ||
 	    (wrk->obj->objcore->flags & OC_F_PASS)) {
-- 
1.7.4.1

From 1dbc27927e728123962ef88749cb7309167de7ef Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 15:47:59 +0100
Subject: [PATCH 09/10] Use background thread fetching when streaming

---
 bin/varnishd/cache/cache.h          |    4 ++
 bin/varnishd/cache/cache_busyobj.c  |   13 +++++-
 bin/varnishd/cache/cache_center.c   |   41 ++++------------
 bin/varnishd/cache/cache_fetch.c    |   37 ++++++++++++++
 bin/varnishd/cache/cache_response.c |   92 ++++++++++++++++++++++++-----------
 5 files changed, 127 insertions(+), 60 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index c3314f1..31993cf 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -755,6 +755,8 @@ int FetchError(struct worker *w, const char *error);
 int FetchError2(struct worker *w, const char *error, const char *more);
 int FetchHdr(struct sess *sp, int need_host_hdr);
 int FetchBody(struct worker *w, struct busyobj *bo);
+void FetchBodyBackground(struct sess *sp, struct busyobj *bo);
+void FetchBodyWait(struct busyobj *bo);
 int FetchReqBody(struct sess *sp);
 void Fetch_Init(void);
 
@@ -955,6 +957,8 @@ void WSL_Flush(struct worker *w, int overflow);
 void RES_BuildHttp(const struct sess *sp);
 void RES_WriteObj(struct sess *sp);
 void RES_StreamStart(struct sess *sp);
+void RES_StreamBody(struct sess *sp);
+void RES_StreamWrite(struct sess *sp);
 void RES_StreamEnd(struct sess *sp);
 void RES_StreamPoll(struct worker *wrk);
 
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index c8f61c6..02a85be 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -228,13 +228,17 @@ VBO_StreamData(struct busyobj *busyobj)
 	CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
 	CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC);
 
+	Lck_Lock(&busyobj->vbo->mtx);
 	assert(busyobj->fetch_obj->len >= busyobj->stream_max);
 	if (busyobj->fetch_obj->len > busyobj->stream_max) {
 		busyobj->stream_max = busyobj->fetch_obj->len;
+		AZ(pthread_cond_signal(&busyobj->vbo->cond));
 	}
+	Lck_Unlock(&busyobj->vbo->mtx);
 }
 
-/* Sync the client's stream_ctx with the busyobj */
+/* Sync the client's stream_ctx with the busyobj, and block on no more
+ * data available */
 void
 VBO_StreamSync(struct worker *wrk)
 {
@@ -243,6 +247,12 @@ VBO_StreamSync(struct worker *wrk)
 	CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC);
 	CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
 
+	Lck_Lock(&wrk->busyobj->vbo->mtx);
+	while (!wrk->busyobj->stream_stopped &&
+	       wrk->sctx->stream_max == wrk->busyobj->stream_max) {
+		Lck_CondWait(&wrk->busyobj->vbo->cond, &wrk->busyobj->vbo->mtx,
+			     NULL);
+	}
 	wrk->sctx->stream_stopped = wrk->busyobj->stream_stopped;
 	wrk->sctx->stream_max = wrk->busyobj->stream_max;
 	if (wrk->obj->objcore == NULL ||
@@ -251,4 +261,5 @@ VBO_StreamSync(struct worker *wrk)
 		 * with all chunks before this one */
 		wrk->busyobj->stream_frontchunk = wrk->sctx->stream_frontchunk;
 	}
+	Lck_Unlock(&wrk->busyobj->vbo->mtx);
 }
diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 335e87a..c5e32b5 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -932,58 +932,37 @@ DOT streambody -> DONE [style=bold,color=cyan]
 static int
 cnt_streambody(struct sess *sp)
 {
-	int i;
-	struct stream_ctx sctx;
-	uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ?
-	    cache_param->gzip_stack_buffer : 1];
 	struct worker *wrk;
 
 	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
 	wrk = sp->wrk;
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
-
 	CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
-	memset(&sctx, 0, sizeof sctx);
-	sctx.magic = STREAM_CTX_MAGIC;
-	AZ(wrk->sctx);
-	wrk->sctx = &sctx;
-
-	if (wrk->res_mode & RES_GUNZIP) {
-		sctx.vgz = VGZ_NewUngzip(wrk, "U S -");
-		sctx.obuf = obuf;
-		sctx.obuf_len = sizeof (obuf);
-	}
-
-	RES_StreamStart(sp);
 
 	AssertObjCorePassOrBusy(wrk->obj->objcore);
+	RES_StreamStart(sp);
 
-	AZ(wrk->busyobj->fetch_obj);
 	wrk->busyobj->fetch_obj = wrk->obj;
-	i = FetchBody(wrk, wrk->busyobj);
-	AZ(wrk->busyobj->fetch_obj);
+	FetchBodyBackground(sp, wrk->busyobj);
+	RES_StreamBody(sp);
 
-	http_Setup(wrk->busyobj->bereq, NULL);
-	http_Setup(wrk->busyobj->beresp, NULL);
-	wrk->busyobj->vfp = NULL;
-	AZ(wrk->busyobj->vbc);
-	AN(sp->director);
+	VBO_StreamWait(wrk->busyobj);
+	AN(wrk->busyobj->stream_stopped);
+	AZ(wrk->busyobj->fetch_obj);
 
-	if (!i && wrk->obj->objcore != NULL) {
+	if (wrk->busyobj->fetch_failed) {
+		sp->doclose = "Stream error";
+	} else if (wrk->obj->objcore != NULL) {
 		EXP_Insert(wrk->obj);
 		AN(wrk->obj->objcore);
 		AN(wrk->obj->objcore->ban);
 		HSH_Unbusy(wrk);
-	} else {
-		sp->doclose = "Stream error";
 	}
-	wrk->acct_tmp.fetch++;
+	sp->wrk->acct_tmp.fetch++;
 	sp->director = NULL;
 	sp->restarts = 0;
 
 	RES_StreamEnd(sp);
-	if (wrk->res_mode & RES_GUNZIP)
-		(void)VGZ_Destroy(&sctx.vgz, sp->vsl_id);
 
 	wrk->sctx = NULL;
 	assert(WRW_IsReleased(wrk));
diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index a8cd23e..fb5d72e 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -485,6 +485,43 @@ FetchHdr(struct sess *sp, int need_host_hdr)
 
 /*--------------------------------------------------------------------*/
 
+static void
+fetch_body_task(struct worker *wrk, void *priv)
+{
+	struct busyobj *bo;
+	struct object *obj;
+
+	CAST_OBJ_NOTNULL(bo, priv, BUSYOBJ_MAGIC);
+
+	CHECK_OBJ_NOTNULL(bo->fetch_obj, OBJECT_MAGIC);
+	CHECK_OBJ_NOTNULL(bo->vbc, VBC_MAGIC);
+
+	obj = bo->fetch_obj;
+	wrk->busyobj = bo;
+
+	bo->fetch_failed = FetchBody(wrk, bo);
+	AZ(bo->fetch_obj);
+	AZ(bo->vbc);
+	http_Setup(bo->bereq, NULL);
+	http_Setup(bo->beresp, NULL);
+	bo->vfp = NULL;
+	wrk->busyobj = NULL;
+	AZ(wrk->obj);
+
+	VBO_StreamStopped(bo);
+	VBO_DerefBusyObj(wrk, &bo);
+}
+
+void
+FetchBodyBackground(struct sess *sp, struct busyobj *bo)
+{
+	CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
+	CHECK_OBJ_NOTNULL(bo->fetch_obj, OBJECT_MAGIC);
+	AssertObjCorePassOrBusy(bo->fetch_obj->objcore);
+	VBO_RefBusyObj(bo);
+	SES_Schedule_Task(sp, fetch_body_task, bo);
+}
+
 int
 FetchBody(struct worker *wrk, struct busyobj *bo)
 {
diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c
index 5038395..c6f14f2 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -329,11 +329,6 @@ RES_WriteObj(struct sess *sp)
 void
 RES_StreamStart(struct sess *sp)
 {
-	struct stream_ctx *sctx;
-
-	sctx = sp->wrk->sctx;
-	CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
-
 	AZ(sp->wrk->res_mode & RES_ESI_CHILD);
 	AN(sp->wantbody);
 
@@ -357,22 +352,57 @@ RES_StreamStart(struct sess *sp)
 }
 
 void
-RES_StreamPoll(struct worker *wrk)
+RES_StreamBody(struct sess *sp)
+{
+	struct stream_ctx sctx;
+	struct busyobj *bo;
+	uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ?
+		     cache_param->gzip_stack_buffer : 1];
+
+	bo = sp->wrk->busyobj;
+	CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
+	AN(sp->wantbody);
+
+	memset(&sctx, 0, sizeof sctx);
+	sctx.magic = STREAM_CTX_MAGIC;
+	AZ(sp->wrk->sctx);
+	sp->wrk->sctx = &sctx;
+
+	if (sp->wrk->res_mode & RES_GUNZIP) {
+		sctx.vgz = VGZ_NewUngzip(sp->wrk, "U S -");
+		sctx.obuf = obuf;
+		sctx.obuf_len = sizeof (obuf);
+	}
+
+	while (!sctx.stream_stopped || sctx.stream_next < sctx.stream_max)  {
+		VBO_StreamSync(sp->wrk);
+		RES_StreamWrite(sp);
+	}
+
+	if (sp->wrk->res_mode & RES_GUNZIP) {
+		if (sctx.obuf_ptr > 0)
+			(void)WRW_Write(sp->wrk, sctx.obuf, sctx.obuf_ptr);
+		VGZ_Destroy(&sctx.vgz, sp->vsl_id);
+	}
+	sp->wrk->sctx = NULL;
+}
+
+void
+RES_StreamWrite(struct sess *sp)
 {
+	struct worker *wrk;
 	struct stream_ctx *sctx;
 	struct storage *st;
-	struct object *fetch_obj;
 	ssize_t l, l2, stlen;
 	void *ptr;
 
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	wrk = sp->wrk;
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
 	CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC);
 	sctx = wrk->sctx;
 	CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
 
-	VBO_StreamData(wrk->busyobj);
-	VBO_StreamSync(wrk);
-
 	if (sctx->stream_max == sctx->stream_next)
 		return;
 	assert(sctx->stream_max > sctx->stream_next);
@@ -409,11 +439,34 @@ RES_StreamPoll(struct worker *wrk)
 	}
 	if (!(wrk->res_mode & RES_GUNZIP))
 		(void)WRW_Flush(wrk);
+}
+
+void
+RES_StreamEnd(struct sess *sp)
+{
+	if (sp->wrk->res_mode & RES_CHUNKED &&
+	    !(sp->wrk->res_mode & RES_ESI_CHILD))
+		WRW_EndChunk(sp->wrk);
+	if (WRW_FlushRelease(sp->wrk))
+		SES_Close(sp, "remote closed");
+}
+
+void
+RES_StreamPoll(struct worker *wrk)
+{
+	struct object *fetch_obj;
+	struct storage *st;
+
+	CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+
+	VBO_StreamData(wrk->busyobj);
 
 	if (wrk->busyobj->stream_frontchunk == NULL)
 		return;
 
-	/* It's a pass - remove chunks already delivered */
+	/* It's a pass - remove chunks already delivered. Should be OK
+	 * to do lock-free, as we are not fiddling pointers of any
+	 * storage chunk passed busyobj->stream_frontchunk */
 	fetch_obj = wrk->busyobj->fetch_obj;
 	CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
 	assert(fetch_obj->objcore == NULL ||
@@ -427,20 +480,3 @@ RES_StreamPoll(struct worker *wrk)
 		STV_free(st);
 	}
 }
-
-void
-RES_StreamEnd(struct sess *sp)
-{
-	struct stream_ctx *sctx;
-
-	sctx = sp->wrk->sctx;
-	CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
-
-	if (sp->wrk->res_mode & RES_GUNZIP && sctx->obuf_ptr > 0)
-		(void)WRW_Write(sp->wrk, sctx->obuf, sctx->obuf_ptr);
-	if (sp->wrk->res_mode & RES_CHUNKED &&
-	    !(sp->wrk->res_mode & RES_ESI_CHILD))
-		WRW_EndChunk(sp->wrk);
-	if (WRW_FlushRelease(sp->wrk))
-		SES_Close(sp, "remote closed");
-}
-- 
1.7.4.1

From 8d07e52a23fd42582ff1a6adf1c9335138b509e4 Mon Sep 17 00:00:00 2001
From: Martin Blix Grydeland <[email protected]>
Date: Thu, 15 Dec 2011 15:53:19 +0100
Subject: [PATCH 10/10] Add a couple of streaming test cases

---
 bin/varnishtest/tests/t00002.vtc |   38 ++++++++++++++++++++++++++++++++++++++
 bin/varnishtest/tests/t00003.vtc |   33 +++++++++++++++++++++++++++++++++
 2 files changed, 71 insertions(+), 0 deletions(-)
 create mode 100644 bin/varnishtest/tests/t00002.vtc
 create mode 100644 bin/varnishtest/tests/t00003.vtc

diff --git a/bin/varnishtest/tests/t00002.vtc b/bin/varnishtest/tests/t00002.vtc
new file mode 100644
index 0000000..440577c
--- /dev/null
+++ b/bin/varnishtest/tests/t00002.vtc
@@ -0,0 +1,38 @@
+varnishtest "Test streaming recepient in pass mode"
+
+server s1 {
+	rxreq
+	txresp -nolen -hdr "Transfer-encoding: chunked"
+	chunked "<1>------------------------<1>\n"
+	sema r1 sync 2
+	chunked "<2>------------------------<2>\n"
+	sema r2 sync 2
+	chunkedlen 0
+} -start
+
+varnish v1 -vcl+backend {
+	sub vcl_recv {
+		return (pass);
+	}
+
+	sub vcl_fetch {
+		set beresp.do_stream = true;
+	}
+} -start
+
+client c1 {
+	txreq -hdr "foo: /foo"
+	rxresp -no_obj
+
+	rxchunk
+	expect resp.chunklen == 31
+	sema r1 sync 2
+
+	rxchunk
+	expect resp.chunklen == 31
+	sema r2 sync 2
+
+	rxchunk
+	expect resp.chunklen == 0
+	expect resp.bodylen == 62
+} -run
diff --git a/bin/varnishtest/tests/t00003.vtc b/bin/varnishtest/tests/t00003.vtc
new file mode 100644
index 0000000..6681595
--- /dev/null
+++ b/bin/varnishtest/tests/t00003.vtc
@@ -0,0 +1,33 @@
+varnishtest "Test failed streamed delivery not entering cache"
+
+server s1 {
+	rxreq
+	txresp -nolen -hdr "Transfer-encoding: chunked"
+	chunked "<1>------------------------<1>\n"
+	chunked "<2>------------------------<2>\n"
+	accept
+	rxreq
+	txresp -nolen -hdr "Transfer-encoding: chunked"
+	chunked "<1>------------------------<1>\n"
+	chunked "<2>------------------------<2>\n"
+	chunked "<3>------------------------<3>\n"
+	chunkedlen 0
+} -start
+
+varnish v1 -vcl+backend {
+	sub vcl_fetch {
+		set beresp.do_stream = true;
+	}
+} -start
+
+client c1 {
+	txreq
+	rxresp
+	delay 1
+} -run
+
+client c2 {
+	txreq
+	rxresp
+	expect resp.bodylen == 93
+} -run
-- 
1.7.4.1

_______________________________________________
varnish-dev mailing list
[email protected]
https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev

Reply via email to