Thread grabbing statistics
---
 bin/varnishd/cache/cache.h         |   16 ++++++
 bin/varnishd/cache/cache_pool.c    |  106 +++++++++++++++++++++++++++++++++++-
 bin/varnishd/cache/cache_session.c |   21 +++++++
 bin/varnishd/cache/cache_wrk.c     |   18 ++++++
 include/tbl/vsc_f_main.h           |   15 +++++
 5 files changed, 174 insertions(+), 2 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 817b7c7..11f0f6a 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -294,8 +294,11 @@ enum e_do_what {
        pool_do_accept,
        pool_do_nothing,
        pool_do_die,
+       pool_do_task,
 };
 
+typedef void taskfunc(struct worker *, void *priv);
+
 struct worker {
        unsigned                magic;
 #define WORKER_MAGIC           0x6391adcf
@@ -331,6 +334,15 @@ struct worker {
        /* Stream state */
        struct stream_ctx       *sctx;
 
+       /* Task */
+       taskfunc                *taskfunc;
+       void                    *taskpriv;
+
+       /* ESI delivery stuff */
+       int                     gzip_resp;
+       ssize_t                 l_crc;
+       uint32_t                crc;
+
        /* Timeouts */
        double                  connect_timeout;
        double                  first_byte_timeout;
@@ -894,6 +906,8 @@ void PipeSession(struct sess *sp);
 void Pool_Init(void);
 void Pool_Work_Thread(void *priv, struct worker *w);
 int Pool_Schedule(struct pool *pp, struct sess *sp);
+struct worker *Pool_GrabWorker(struct pool *pp, struct worker *wrk,
+                              unsigned timeout);
 
 #define WRW_IsReleased(w)      ((w)->wrw.wfd == NULL)
 int WRW_Error(const struct worker *w);
@@ -920,6 +934,7 @@ int SES_Schedule(struct sess *sp);
 void SES_Handle(struct sess *sp, double now);
 void SES_GetReq(struct sess *sp);
 void SES_ReleaseReq(struct sess *sp);
+struct worker *SES_GrabWorker(struct sess *sp, unsigned timeout);
 
 /* cache_shmlog.c */
 extern struct VSC_C_main *VSC_C_main;
@@ -1007,6 +1022,7 @@ void *WRK_thread(void *priv);
 typedef void *bgthread_t(struct sess *, void *priv);
 void WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func,
     void *priv);
+void WRK_DoTask(struct worker *wrk, taskfunc *func, void *priv);
 
 /* cache_ws.c */
 
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 84a6cfd..89c96ec 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -89,6 +89,14 @@ struct poolsock {
        struct listen_sock              *lsock;
 };
 
+struct grabber {
+       unsigned                        magic;
+#define GRABBER_MAGIC                  0x1f6f53f5
+       VTAILQ_ENTRY(grabber)           list;
+       struct worker                   *wrk_waiting;
+       struct worker                   *wrk_grabbed;
+};
+
 /* Number of work requests queued in excess of worker threads available */
 
 struct pool {
@@ -104,6 +112,7 @@ struct pool {
        struct workerhead       idle;
        VTAILQ_HEAD(, sess)     queue;
        VTAILQ_HEAD(, poolsock) socks;
+       VTAILQ_HEAD(, grabber)  grabqueue;
        unsigned                nthr;
        unsigned                lqueue;
        unsigned                last_lqueue;
@@ -183,6 +192,7 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
        struct pool *pp;
        int stats_clean, i;
        struct poolsock *ps;
+       struct grabber *grabber;
 
        CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
        wrk->pool = pp;
@@ -197,8 +207,22 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 
                WS_Reset(wrk->ws, NULL);
 
-               wrk->sp = VTAILQ_FIRST(&pp->queue);
-               if (wrk->sp != NULL) {
+               grabber = VTAILQ_FIRST(&pp->grabqueue);
+               if (grabber == NULL)
+                       wrk->sp = VTAILQ_FIRST(&pp->queue);
+               if (grabber != NULL) {
+                       /* We've been grabbed */
+                       assert(pp->lqueue > 0);
+                       VTAILQ_REMOVE(&pp->grabqueue, grabber, list);
+                       pp->lqueue--;
+                       CHECK_OBJ_NOTNULL(grabber, GRABBER_MAGIC);
+                       CHECK_OBJ_NOTNULL(grabber->wrk_waiting, WORKER_MAGIC);
+                       AZ(grabber->wrk_grabbed);
+                       grabber->wrk_grabbed = wrk;
+                       AZ(pthread_cond_signal(&grabber->wrk_waiting->cond));
+                       /* Wait for work to be assigned to us */
+                       (void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL);
+               } else if (wrk->sp != NULL) {
                        /* Process queued requests, if any */
                        assert(pp->lqueue > 0);
                        VTAILQ_REMOVE(&pp->queue, wrk->sp, list);
@@ -271,6 +295,18 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
                                if (wrk->vcl != NULL)
                                        VCL_Rel(&wrk->vcl);
                        }
+               } else if (wrk->do_what == pool_do_task) {
+                       stats_clean = 0;
+                       wrk->lastused = NAN;
+
+                       AZ(wrk->sp);
+                       AN(wrk->taskfunc);
+                       wrk->taskfunc(wrk, wrk->taskpriv);
+                       wrk->taskfunc = NULL;
+                       wrk->taskpriv = NULL;
+
+                       WS_Assert(wrk->ws);
+                       AZ(wrk->busyobj);
                } else if (wrk->do_what == pool_do_nothing) {
                        /* we already did */
                } else {
@@ -334,6 +370,71 @@ Pool_Schedule(struct pool *pp, struct sess *sp)
 }
 
 /*--------------------------------------------------------------------
+ * Grab an idle worker to do a task, waiting up to timeout
+ * milliseconds for one to become idle.
+ * Returns:
+ *   pointer to an idle worker
+ *   NULL on failure to acquire idle worker within timeout
+ */
+
+struct worker *
+Pool_GrabWorker(struct pool *pp, struct worker *wrk, unsigned timeout)
+{
+       struct grabber grabber;
+       struct timespec ts;
+
+       CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+       CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+
+       memset(&grabber, 0, sizeof grabber);
+       grabber.magic = GRABBER_MAGIC;
+
+       Lck_Lock(&pp->mtx);
+
+       grabber.wrk_grabbed = VTAILQ_FIRST(&pp->idle);
+       if (grabber.wrk_grabbed != NULL) {
+               VTAILQ_REMOVE(&pp->idle, grabber.wrk_grabbed, list);
+               VSC_C_main->threadgrab_idle++;
+       }
+
+       if (grabber.wrk_grabbed == NULL && timeout > 0 &&
+           pp->lqueue < (cache_param->queue_max * pp->nthr) / 100) {
+               /* Wait up to timeout milliseconds for a worker to
+                * become idle */
+               AZ(clock_gettime(CLOCK_REALTIME, &ts));
+               ts.tv_sec += timeout / 1000;
+               ts.tv_nsec += (timeout % 1000) *  1000000;
+               if (ts.tv_nsec >= 1000000000) {
+                       ts.tv_sec++;
+                       ts.tv_nsec -= 1000000000;
+               }
+               grabber.wrk_waiting = wrk;
+               VTAILQ_INSERT_TAIL(&pp->grabqueue, &grabber, list);
+               pp->lqueue++;
+               AZ(pthread_cond_signal(&pp->herder_cond));
+               (void)Lck_CondWait(&wrk->cond, &pp->mtx, &ts);
+               if (grabber.wrk_grabbed != NULL) {
+                       VSC_C_main->threadgrab_waiting++;
+               } else {
+                       VTAILQ_REMOVE(&pp->grabqueue, &grabber, list);
+                       pp->lqueue--;
+               }
+       }
+
+       if (grabber.wrk_grabbed != NULL) {
+               CHECK_OBJ_NOTNULL(grabber.wrk_grabbed, WORKER_MAGIC);
+               AZ(grabber.wrk_grabbed->taskfunc);
+               AZ(grabber.wrk_grabbed->taskpriv);
+       } else {
+               VSC_C_main->threadgrab_failed++;
+       }
+
+       Lck_Unlock(&pp->mtx);
+
+       return (grabber.wrk_grabbed);
+}
+
+/*--------------------------------------------------------------------
  * Create another thread, if necessary & possible
  */
 
@@ -480,6 +581,7 @@ pool_mkpool(unsigned pool_no)
        XXXAN(pp);
        Lck_New(&pp->mtx, lck_wq);
 
+       VTAILQ_INIT(&pp->grabqueue);
        VTAILQ_INIT(&pp->queue);
        VTAILQ_INIT(&pp->idle);
        VTAILQ_INIT(&pp->socks);
diff --git a/bin/varnishd/cache/cache_session.c 
b/bin/varnishd/cache/cache_session.c
index d1dbaba..ca93bb5 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -159,6 +159,27 @@ SES_Schedule(struct sess *sp)
 }
 
 /*--------------------------------------------------------------------
+ * Grab an idle worker to do a task, waiting up to timeout
+ * milliseconds for one to become idle.
+ * Returns:
+ *   pointer to an idle woker
+ *   NULL on failure to acquire idle worker within timeout
+ */
+
+struct worker *
+SES_GrabWorker(struct sess *sp, unsigned timeout)
+{
+       struct sesspool *pp;
+
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       pp = sp->sesspool;
+       CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
+       AN(pp->pool);
+
+       return (Pool_GrabWorker(pp->pool, sp->wrk, timeout));
+}
+
+/*--------------------------------------------------------------------
  * Handle a session (from waiter)
  */
 
diff --git a/bin/varnishd/cache/cache_wrk.c b/bin/varnishd/cache/cache_wrk.c
index 6087991..e866aa1 100644
--- a/bin/varnishd/cache/cache_wrk.c
+++ b/bin/varnishd/cache/cache_wrk.c
@@ -128,6 +128,24 @@ WRK_BgThread(pthread_t *thr, const char *name, bgthread_t 
*func, void *priv)
        AZ(pthread_create(thr, NULL, wrk_bgthread, bt));
 }
 
+/*--------------------------------------------------------------------
+ * Make the referenced worker execute the given task
+ */
+
+void
+WRK_DoTask(struct worker *wrk, taskfunc *func, void *priv)
+{
+       CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+       AN(func);
+
+       AZ(wrk->taskfunc);
+       AZ(wrk->taskpriv);
+       wrk->taskfunc = func;
+       wrk->taskpriv = priv;
+       wrk->do_what = pool_do_task;
+       AZ(pthread_cond_signal(&wrk->cond));
+}
+
 /*--------------------------------------------------------------------*/
 
 static void *
diff --git a/include/tbl/vsc_f_main.h b/include/tbl/vsc_f_main.h
index df7984a..b6b8dc9 100644
--- a/include/tbl/vsc_f_main.h
+++ b/include/tbl/vsc_f_main.h
@@ -193,6 +193,21 @@ VSC_F(thread_queue_len,            uint64_t, 0, 'g',
        "  See also param queue_max."
 )
 
+VSC_F(threadgrab_idle,         uint64_t, 0, 'c',
+    "Threads grabbed while idle",
+        "Number of times an idle thread was grabbed."
+)
+
+VSC_F(threadgrab_waiting,      uint64_t, 0, 'c',
+    "Threads grabbed while waiting",
+        "Number of times an idle thread was grabbed while waiting."
+)
+
+VSC_F(threadgrab_failed,       uint64_t, 0, 'c',
+    "Thread grab failed",
+        "Number of times failed to grab a thread within the timeout."
+)
+
 VSC_F(sess_queued,             uint64_t, 0, 'c',
     "Sessions queued for thread",
        "Number of times session was queued waiting for a thread."
-- 
1.7.4.1


_______________________________________________
varnish-dev mailing list
[email protected]
https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev

Reply via email to