commit 6a5f62f68f2c73dbbbbddb4fa3759586f4c2b0dc
Author: Nick Mathewson <ni...@torproject.org>
Date:   Tue Oct 3 12:49:34 2017 -0400

    Move responsibility for threadpool reply-handler events to workqueue
    
    This change makes cpuworker and test_workqueue no longer need to
    include event2/event.h.  Now workqueue.c needs to include it, but
    that is at least somewhat logical here.
---
 src/common/workqueue.c    | 47 ++++++++++++++++++++++++++++++++++++++++-------
 src/common/workqueue.h    |  5 ++++-
 src/or/cpuworker.c        | 25 +++++--------------------
 src/test/test_workqueue.c | 27 ++++++++++-----------------
 4 files changed, 59 insertions(+), 45 deletions(-)

diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index ec96959b7..12e31414e 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -1,3 +1,4 @@
+
 /* copyright (c) 2013-2015, The Tor Project, Inc. */
 /* See LICENSE for licensing information */
 
@@ -24,6 +25,7 @@
 
 #include "orconfig.h"
 #include "compat.h"
+#include "compat_libevent.h"
 #include "compat_threads.h"
 #include "crypto.h"
 #include "util.h"
@@ -31,6 +33,8 @@
 #include "tor_queue.h"
 #include "torlog.h"
 
+#include <event2/event.h>
+
 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
@@ -63,6 +67,9 @@ struct threadpool_s {
   void (*free_update_arg_fn)(void *);
   /** Array of n_threads update arguments. */
   void **update_args;
+  /** Event to notice when another thread has sent a reply. */
+  struct event *reply_event;
+  void (*reply_cb)(threadpool_t *);
 
   /** Number of elements in threads. */
   int n_threads;
@@ -597,15 +604,41 @@ replyqueue_new(uint32_t alertsocks_flags)
   return rq;
 }
 
-/**
- * Return the "read socket" for a given reply queue.  The main thread should
- * listen for read events on this socket, and call replyqueue_process() every
- * time it triggers.
+/** Internal: Run from the libevent mainloop when there is work to handle in
+ * the reply queue handler. */
+static void
+reply_event_cb(evutil_socket_t sock, short events, void *arg)
+{
+  threadpool_t *tp = arg;
+  (void) sock;
+  (void) events;
+  replyqueue_process(tp->reply_queue);
+  if (tp->reply_cb)
+    tp->reply_cb(tp);
+}
+
+/** Register the threadpool <b>tp</b>'s reply queue with the libevent
+ * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after
+ * each time there is work to process from the reply queue. Return 0 on
+ * success, -1 on failure.
  */
-tor_socket_t
-replyqueue_get_socket(replyqueue_t *rq)
+int
+threadpool_register_reply_event(threadpool_t *tp,
+                                void (*cb)(threadpool_t *tp))
 {
-  return rq->alert.read_fd;
+  struct event_base *base = tor_libevent_get_base();
+
+  if (tp->reply_event) {
+    tor_event_free(tp->reply_event);
+  }
+  tp->reply_event = tor_event_new(base,
+                                  tp->reply_queue->alert.read_fd,
+                                  EV_READ|EV_PERSIST,
+                                  reply_event_cb,
+                                  tp);
+  tor_assert(tp->reply_event);
+  tp->reply_cb = cb;
+  return event_add(tp->reply_event, NULL);
 }
 
 /**
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index eb885e680..e1fe612e2 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -56,8 +56,11 @@ threadpool_t *threadpool_new(int n_threads,
 replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
 
 replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);
-tor_socket_t replyqueue_get_socket(replyqueue_t *rq);
 void replyqueue_process(replyqueue_t *queue);
 
+struct event_base;
+int threadpool_register_reply_event(threadpool_t *tp,
+                                    void (*cb)(threadpool_t *tp));
+
 #endif /* !defined(TOR_WORKQUEUE_H) */
 
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index 7da7dc5f8..083691c4f 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -30,8 +30,6 @@
 #include "router.h"
 #include "workqueue.h"
 
-#include <event2/event.h>
-
 static void queue_pending_tasks(void);
 
 typedef struct worker_state_s {
@@ -69,22 +67,12 @@ worker_state_free_void(void *arg)
 
 static replyqueue_t *replyqueue = NULL;
 static threadpool_t *threadpool = NULL;
-static struct event *reply_event = NULL;
 
 static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
 
 static int total_pending_tasks = 0;
 static int max_pending_tasks = 128;
 
-static void
-replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
-{
-  replyqueue_t *rq = arg;
-  (void) sock;
-  (void) events;
-  replyqueue_process(rq);
-}
-
 /** Initialize the cpuworker subsystem. It is OK to call this more than once
  * during Tor's lifetime.
  */
@@ -94,14 +82,6 @@ cpu_init(void)
   if (!replyqueue) {
     replyqueue = replyqueue_new(0);
   }
-  if (!reply_event) {
-    reply_event = tor_event_new(tor_libevent_get_base(),
-                                replyqueue_get_socket(replyqueue),
-                                EV_READ|EV_PERSIST,
-                                replyqueue_process_cb,
-                                replyqueue);
-    event_add(reply_event, NULL);
-  }
   if (!threadpool) {
     /*
       In our threadpool implementation, half the threads are permissive and
@@ -115,7 +95,12 @@ cpu_init(void)
                                 worker_state_new,
                                 worker_state_free_void,
                                 NULL);
+
+    int r = threadpool_register_reply_event(threadpool, NULL);
+
+    tor_assert(r == 0);
   }
+
   /* Total voodoo. Can we make this more sensible? */
   max_pending_tasks = get_num_cpus(get_options()) * 64;
   crypto_seed_weak_rng(&request_sample_rng);
diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c
index 2a0261147..940973cda 100644
--- a/src/test/test_workqueue.c
+++ b/src/test/test_workqueue.c
@@ -12,7 +12,6 @@
 #include "compat_libevent.h"
 
 #include <stdio.h>
-#include <event2/event.h>
 
 #define MAX_INFLIGHT (1<<16)
 
@@ -159,6 +158,7 @@ static tor_weak_rng_t weak_rng;
 static int n_sent = 0;
 static int rsa_sent = 0;
 static int ecdh_sent = 0;
+static int n_received_previously = 0;
 static int n_received = 0;
 static int no_shutdown = 0;
 
@@ -256,19 +256,13 @@ add_n_work_items(threadpool_t *tp, int n)
 static int shutting_down = 0;
 
 static void
-replysock_readable_cb(tor_socket_t sock, short what, void *arg)
+replysock_readable_cb(threadpool_t *tp)
 {
-  threadpool_t *tp = arg;
-  replyqueue_t *rq = threadpool_get_replyqueue(tp);
-
-  int old_r = n_received;
-  (void) sock;
-  (void) what;
-
-  replyqueue_process(rq);
-  if (old_r == n_received)
+  if (n_received_previously == n_received)
     return;
 
+  n_received_previously = n_received;
+
   if (opt_verbose) {
     printf("%d / %d", n_received, n_sent);
     if (opt_n_cancel)
@@ -337,7 +331,6 @@ main(int argc, char **argv)
   threadpool_t *tp;
   int i;
   tor_libevent_cfg evcfg;
-  struct event *ev;
   uint32_t as_flags = 0;
 
   for (i = 1; i < argc; ++i) {
@@ -411,11 +404,11 @@ main(int argc, char **argv)
   memset(&evcfg, 0, sizeof(evcfg));
   tor_libevent_initialize(&evcfg);
 
-  ev = tor_event_new(tor_libevent_get_base(),
-                     replyqueue_get_socket(rq), EV_READ|EV_PERSIST,
-                     replysock_readable_cb, tp);
-
-  event_add(ev, NULL);
+  {
+    int r = threadpool_register_reply_event(tp,
+                                            replysock_readable_cb);
+    tor_assert(r == 0);
+  }
 
 #ifdef TRACK_RESPONSES
   handled = bitarray_init_zero(opt_n_items);



_______________________________________________
tor-commits mailing list
tor-commits@lists.torproject.org
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to