This adds support for components to register to receive AsyncCalls when IPC messages are received.

The subscription is linked to a message type. With currently one registered recipient Job per type.


There is a followup patch I'm still testing. Which converts the CacheManager into an AsyncJob and moves the mtCacheManager* message handling functions into it.

Amos
--
Please be using
  Current Stable Squid 2.7.STABLE9 or 3.1.14
  Beta testers wanted for 3.2.0.10
=== modified file 'src/ipc/Coordinator.cc'
--- src/ipc/Coordinator.cc	2011-07-20 12:38:39 +0000
+++ src/ipc/Coordinator.cc	2011-08-11 17:13:04 +0000
@@ -57,6 +57,16 @@
 
 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
 {
+    if (MessageSubscriptions[message.type()] != NULL) {
+        typedef UnaryMemFunT<AsyncJob, TypedMsgHdr> Dialer;
+        AsyncCall::Pointer call = MessageSubscriptions[message.type()]->callback();
+        Dialer *d = dynamic_cast<Dialer *>(call->getDialer());
+        assert(d);
+        d->arg1 = message;
+        ScheduleCallHere(call);
+        return;
+    }
+
     switch (message.type()) {
     case mtRegistration:
         debugs(54, 6, HERE << "Registration request");
=== modified file 'src/ipc/Messages.h'
--- src/ipc/Messages.h	2011-02-06 19:50:52 +0000
+++ src/ipc/Messages.h	2011-08-12 17:00:59 +0000
@@ -8,22 +8,52 @@
 #ifndef SQUID_IPC_MESSAGES_H
 #define SQUID_IPC_MESSAGES_H
 
-/** Declarations used by various IPC messages */
+//#include "base/AsyncJobCalls.h"
+#include "base/Subscription.h"
+
+/*
+ * Messaging and Message Subscriptions.
+ *
+ * Subscribers must create a Dialer and a AsyncCall to receieve TypedMsgHdr parameters
+ * then generate a new CallSubscription<Dialer>(AsyncCall).
+ * then use RegisterListenFor(message type, subscription) to start receiving IPC messages
+ */
 
 namespace Ipc
 {
 
 /// message class identifier
-typedef enum { mtNone = 0, mtRegistration,
+typedef enum { mtNone = 0,
+
+               // IPC worker registration messages
+               mtRegistration,
                mtSharedListenRequest, mtSharedListenResponse,
-               mtCacheMgrRequest, mtCacheMgrResponse
-#if SQUID_SNMP
-               ,
-               mtSnmpRequest, mtSnmpResponse
-#endif
+
+               // Cache Manager aand SNMP agent messages
+               mtCacheMgrRequest, mtCacheMgrResponse,
+               mtSnmpRequest, mtSnmpResponse,
+
+               mtLastEntry // dummy (end of list)
              } MessageType;
 
+/// Currently subscribed listeners
+extern Subscription::Pointer MessageSubscriptions[mtLastEntry];
+
+/// subscribe a callback to happen whenever a certain message class arrives
+inline void RegisterListenFor(MessageType m, const Subscription::Pointer &callSub)
+{
+    assert(MessageSubscriptions[m] != NULL);
+    MessageSubscriptions[m] = callSub;
+}
+
+/// stop listening for message of the given type.
+/// already scheduled events may exist and will happen
+/// but future messages will be dropped.
+inline void ClearListenFor(MessageType m)
+{
+    MessageSubscriptions[m] = NULL;
+}
+
 } // namespace Ipc;
 
-
 #endif /* SQUID_IPC_MESSAGES_H */

=== modified file 'src/ipc/TypedMsgHdr.h'
--- src/ipc/TypedMsgHdr.h	2010-11-21 04:40:05 +0000
+++ src/ipc/TypedMsgHdr.h	2011-08-12 14:06:09 +0000
@@ -8,6 +8,9 @@
 #ifndef SQUID_IPC_TYPED_MSG_HDR_H
 #define SQUID_IPC_TYPED_MSG_HDR_H
 
+#if HAVE_OSTREAM
+#include <ostream>
+#endif
 #if HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
 #endif
@@ -99,4 +102,10 @@
 
 } // namespace Ipc
 
+std::ostream &
+operator <<(std::ostream &o, const Ipc::TypedMsgHdr &m)
+{
+    return o << "IPC message [type=" << m.type() << "]";
+}
+
 #endif /* SQUID_IPC_TYPED_MSG_HDR_H */

Reply via email to