The problem:
CommCalls functionality is conflated with calls created to do general FD handling (FD as pipe handle, FD as disk handle, FD as pointer into the fd_table structure). Sometimes because they do operations mirroring comm handlers and also use FD. None of this actually requires the comm layer to be involved though. The Comm::Connection objects which comm handlers pass around is also very inappropriate for these FD types.


This patch adds functionality for two types of AsyncCall to replace the Comm parameters being abused.

* FdeCbParams is added to CommCalls infrastructure, for use internally and "lower" than comm API to pass around raw FD values. This should be avoided on TCP socket FD, but may be used by callers needing FD where Comm::Connection is inappropriate.

* UnaryCbdataDialer<T> template is added to dial a call on any wrapper function taking one cbdata state object as a parameter pointer. This dialer checks the cbdata base object validity prior to calling. As such it acts like the cbdata callbacks, but adding async scheduling ability. It also adds the AsyncCalls ability for the wrapper functions to have typed parameters instead of "void *data" and casting.


Alex: I've been searching the Async code for a long while trying to figure out how to do this with the existing API. If you can point me at how to do it without adding a new template I'd be grateful. Or alternatively for any reasons why this does not already exist.

For now this dialer template is isolated in helper.cc and used only for the helper pipe() FD close handlers. If others agree I'd like to make it globally available in new src/base/AsyncCbdataCalls.h and use it to ease the transition from cbdata objects to Jobs. It seems the big hitch currently to transitioning is that to use generic Unary/Nullary templates the object must be a fully transitioned AsyncJob so we end up with custom-coded dialers for each cbdata class type.

Amos
=== modified file 'src/CommCalls.cc'
--- src/CommCalls.cc    2011-11-22 12:00:59 +0000
+++ src/CommCalls.cc    2011-11-22 12:09:03 +0000
@@ -108,6 +108,12 @@
 {
 }
 
+/* FdeCbParams */
+
+FdeCbParams::FdeCbParams(void *aData):
+        CommCommonCbParams(aData)
+{
+}
 
 /* CommAcceptCbPtrFun */
 
@@ -231,3 +237,25 @@
     params.print(os);
     os << ')';
 }
+
+/* FdeCbPtrFun */
+
+FdeCbPtrFun::FdeCbPtrFun(FDECB *aHandler, const FdeCbParams &aParams) :
+        CommDialerParamsT<FdeCbParams>(aParams),
+        handler(aHandler)
+{
+}
+
+void
+FdeCbPtrFun::dial()
+{
+    handler(params);
+}
+
+void
+FdeCbPtrFun::print(std::ostream &os) const
+{
+    os << '(';
+    params.print(os);
+    os << ')';
+}

=== modified file 'src/CommCalls.h'
--- src/CommCalls.h     2011-11-22 12:00:59 +0000
+++ src/CommCalls.h     2011-11-22 12:09:43 +0000
@@ -35,6 +35,11 @@
 class CommCloseCbParams;
 typedef void CLCB(const CommCloseCbParams &params);
 
+/// Special Calls, for direct use of an FD without a controlling 
Comm::Connection
+/// This is used for pipe() FD with helpers, and internally by Comm when 
handling some special FD actions.
+class FdeCbParams;
+typedef void FDECB(const FdeCbParams &params);
+
 /*
  * TODO: When there are no function-pointer-based callbacks left, all
  * this complexity can be removed. Jobs that need comm services will just
@@ -79,7 +84,7 @@
     comm_err_t flag;  ///< comm layer result status.
     int xerrno;      ///< The last errno to occur. non-zero if flag is 
COMM_ERR.
 
-    int fd; // raw FD which the call was about. use conn instead for new code.
+    int fd; /// FD which the call was about. Set by the caller creator. Use 
conn instead for new code.
 private:
     // should not be needed and not yet implemented
     CommCommonCbParams &operator =(const CommCommonCbParams &params);
@@ -128,6 +133,14 @@
     CommTimeoutCbParams(void *aData);
 };
 
+class FdeCbParams: public CommCommonCbParams
+{
+public:
+    FdeCbParams(void *aData);
+    // TODO make this a standalone object with FD value and pointer to fde 
table entry.
+    // that requires all the existing Comm handlers to be updated first though
+};
+
 // Interface to expose comm callback parameters of all comm dialers.
 // GetCommParams() uses this interface to access comm parameters.
 template <class Params_>
@@ -268,6 +281,21 @@
     CTCB *handler;
 };
 
+/// FD event (FDCB) dialer
+class FdeCbPtrFun: public CallDialer,
+        public CommDialerParamsT<FdeCbParams>
+{
+public:
+    typedef FdeCbParams Params;
+
+    FdeCbPtrFun(FDECB *aHandler, const Params &aParams);
+    void dial();
+    virtual void print(std::ostream &os) const;
+
+public:
+    FDECB *handler;
+};
+
 // AsyncCall to comm handlers implemented as global functions.
 // The dialer is one of the Comm*CbPtrFunT above
 // TODO: Get rid of this class by moving canFire() to canDial() method

=== modified file 'src/comm.cc'
--- src/comm.cc 2011-11-22 12:00:59 +0000
+++ src/comm.cc 2011-11-23 11:19:52 +0000
@@ -1048,7 +1048,7 @@
 
 #if USE_SSL
 void
-commStartSslClose(const CommCloseCbParams &params)
+commStartSslClose(const FdeCbParams &params)
 {
     assert(&fd_table[params.fd].ssl);
     ssl_shutdown_method(fd_table[params.fd].ssl);
@@ -1056,7 +1056,7 @@
 #endif
 
 void
-comm_close_complete(const CommCloseCbParams &params)
+comm_close_complete(const FdeCbParams &params)
 {
 #if USE_SSL
     fde *F = &fd_table[params.fd];
@@ -1119,10 +1119,9 @@
 
 #if USE_SSL
     if (F->ssl) {
-        // XXX: make this a generic async call passing one FD parameter. No 
need to use CommCloseCbParams
         AsyncCall::Pointer startCall=commCbCall(5,4, "commStartSslClose",
-                                                
CommCloseCbPtrFun(commStartSslClose, NULL));
-        CommCloseCbParams &startParams = 
GetCommParams<CommCloseCbParams>(startCall);
+                                                FdeCbPtrFun(commStartSslClose, 
NULL));
+        FdeCbParams &startParams = GetCommParams<FdeCbParams>(startCall);
         startParams.fd = fd;
         ScheduleCallHere(startCall);
     }
@@ -1162,8 +1161,8 @@
 
 
     AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
-                                    CommCloseCbPtrFun(comm_close_complete, 
NULL));
-    CommCloseCbParams &completeParams = 
GetCommParams<CommCloseCbParams>(completeCall);
+                                    FdeCbPtrFun(comm_close_complete, NULL));
+    FdeCbParams &completeParams = GetCommParams<FdeCbParams>(completeCall);
     completeParams.fd = fd;
     // must use async call to wait for all callbacks
     // scheduled before comm_close() to finish

=== modified file 'src/helper.cc'
--- src/helper.cc       2011-11-22 12:00:59 +0000
+++ src/helper.cc       2011-11-23 10:47:49 +0000
@@ -53,8 +53,8 @@
 
 static IOCB helperHandleRead;
 static IOCB helperStatefulHandleRead;
-static CLCB helperServerFree;
-static CLCB helperStatefulServerFree;
+static void helperServerFree(helper_server *srv);
+static void helperStatefulServerFree(helper_stateful_server *srv);
 static void Enqueue(helper * hlp, helper_request *);
 static helper_request *Dequeue(helper * hlp);
 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
@@ -76,6 +76,35 @@
 CBDATA_CLASS_INIT(statefulhelper);
 CBDATA_TYPE(helper_stateful_server);
 
+// dialer to run cbdata object members as Async Calls
+// to ease the transition of these cbdata objects to full Jobs
+template<class T>
+class UnaryCbdataDialer : public CallDialer
+{
+public:
+    typedef T Params;
+    typedef void Handler(T *);
+
+    UnaryCbdataDialer(Handler *aHandler, Params *aParam) :
+            data(cbdataReference(aParam)),
+            handler(aHandler)
+    {}
+
+    ~UnaryCbdataDialer() { cbdataReferenceDone(data); }
+
+    bool canDial(AsyncCall &call) { return true; }
+    void dial(AsyncCall &call) {
+        if (cbdataReferenceValid(data))
+            handler(data);
+    }
+
+    void print(std::ostream &os) const {} // dummy for now.
+
+public:
+    Params *data;
+    Handler *handler;
+};
+
 void
 HelperServerBase::closePipesSafely()
 {
@@ -233,7 +262,9 @@
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        comm_add_close_handler(rfd, helperServerFree, srv);
+        typedef UnaryCbdataDialer<helper_server> SrvDialer;
+        srv->asyncDestructor = asyncCall(5,4, "helperServerFree", 
SrvDialer(helperServerFree, srv));
+        comm_add_close_handler(rfd, srv->asyncDestructor);
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
                                              CommIoCbPtrFun(helperHandleRead, 
srv));
@@ -353,7 +384,9 @@
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        comm_add_close_handler(rfd, helperStatefulServerFree, srv);
+        typedef UnaryCbdataDialer<helper_stateful_server> SrvDialer;
+        srv->asyncDestructor = asyncCall(5,4, "helperServerFree", 
SrvDialer(helperStatefulServerFree, srv));
+        comm_add_close_handler(rfd, srv->asyncDestructor);
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
                                              
CommIoCbPtrFun(helperStatefulHandleRead, srv));
@@ -669,9 +702,8 @@
 /* ====================================================================== */
 
 static void
-helperServerFree(const CommCloseCbParams &params)
+helperServerFree(helper_server *srv)
 {
-    helper_server *srv = (helper_server *)params.data;
     helper *hlp = srv->parent;
     helper_request *r;
     int i, concurrency = hlp->childs.concurrency;
@@ -704,7 +736,7 @@
     if (!srv->flags.shutdown) {
         assert(hlp->childs.n_active > 0);
         hlp->childs.n_active--;
-        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << 
srv->index + 1 << " (FD " << params.fd << ") exited");
+        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << 
srv->index + 1 << " exited");
 
         if (hlp->childs.needNew() > 0) {
             debugs(80, 1, "Too few " << hlp->id_name << " processes are 
running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
@@ -736,9 +768,8 @@
 }
 
 static void
-helperStatefulServerFree(const CommCloseCbParams &params)
+helperStatefulServerFree(helper_stateful_server *srv)
 {
-    helper_stateful_server *srv = (helper_stateful_server *)params.data;
     statefulhelper *hlp = srv->parent;
     helper_stateful_request *r;
 
@@ -766,7 +797,7 @@
     if (!srv->flags.shutdown) {
         assert( hlp->childs.n_active > 0);
         hlp->childs.n_active--;
-        debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << 
" (FD " << params.fd << ") exited");
+        debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << 
" exited");
 
         if (hlp->childs.needNew() > 0) {
             debugs(80, 1, "Too few " << hlp->id_name << " processes are 
running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");

=== modified file 'src/helper.h'
--- src/helper.h        2011-05-13 08:13:01 +0000
+++ src/helper.h        2011-11-22 19:45:58 +0000
@@ -34,6 +34,7 @@
 #define SQUID_HELPER_H
 
 #include "squid.h"
+#include "base/AsyncCall.h"
 #include "cbdata.h"
 #include "comm/forward.h"
 #include "ip/Address.h"
@@ -129,6 +130,8 @@
         unsigned int reserved:1;
     } flags;
 
+    /// the AsyncCall which has been created to destruct this object on FD 
close
+    AsyncCall::Pointer asyncDestructor;
 };
 
 class helper_server : public HelperServerBase

Reply via email to