On 13/08/11 05:23, Amos Jeffries wrote:
This adds support for components to register to receive AsyncCalls when
IPC messages are received.

The subscription is linked to a message type. With currently one
registered recipient Job per type.


Updated version. Includes a new file ipc/Messages.cc to define the subscriptions registry global.

Now tested and works.

Amos
--
Please be using
  Current Stable Squid 2.7.STABLE9 or 3.1.14
  Beta testers wanted for 3.2.0.10
=== modified file 'src/ipc/Coordinator.cc'
--- src/ipc/Coordinator.cc	2011-07-20 12:38:39 +0000
+++ src/ipc/Coordinator.cc	2011-08-13 07:21:31 +0000
@@ -57,6 +53,16 @@
 
 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
 {
+    if (MessageSubscriptions[message.type()] != NULL) {
+        typedef UnaryMemFunT<AsyncJob, TypedMsgHdr> Dialer;
+        AsyncCall::Pointer call = MessageSubscriptions[message.type()]->callback();
+        Dialer *d = dynamic_cast<Dialer *>(call->getDialer());
+        assert(d);
+        d->arg1 = message;
+        ScheduleCallHere(call);
+        return;
+    }
+
     switch (message.type()) {
     case mtRegistration:
         debugs(54, 6, HERE << "Registration request");
=== modified file 'src/ipc/Makefile.am'
--- src/ipc/Makefile.am	2011-07-20 09:24:15 +0000
+++ src/ipc/Makefile.am	2011-08-13 03:58:29 +0000
@@ -10,6 +10,7 @@
 	Kid.h \
 	Kids.cc \
 	Kids.h \
+	Messages.cc \
 	Messages.h \
 	StartListening.cc \
 	StartListening.h \

=== added file 'src/ipc/Messages.cc'
--- src/ipc/Messages.cc	1970-01-01 00:00:00 +0000
+++ src/ipc/Messages.cc	2011-08-13 07:33:51 +0000
@@ -0,0 +1,4 @@
+#include "config.h"
+#include "ipc/Messages.h"
+
+Subscription::Pointer Ipc::MessageSubscriptions[Ipc::mtLastEntry];

=== modified file 'src/ipc/Messages.h'
--- src/ipc/Messages.h	2011-02-06 19:50:52 +0000
+++ src/ipc/Messages.h	2011-08-13 10:13:19 +0000
@@ -8,22 +8,52 @@
 #ifndef SQUID_IPC_MESSAGES_H
 #define SQUID_IPC_MESSAGES_H
 
-/** Declarations used by various IPC messages */
+//#include "base/AsyncJobCalls.h"
+#include "base/Subscription.h"
+
+/*
+ * Messaging and Message Subscriptions.
+ *
+ * Subscribers must create a Dialer and a AsyncCall to receieve TypedMsgHdr parameters
+ * then generate a new CallSubscription<Dialer>(AsyncCall).
+ * then use RegisterListenFor(message type, subscription) to start receiving IPC messages
+ */
 
 namespace Ipc
 {
 
 /// message class identifier
-typedef enum { mtNone = 0, mtRegistration,
+typedef enum { mtNone = 0,
+
+               // IPC worker registration messages
+               mtRegistration,
                mtSharedListenRequest, mtSharedListenResponse,
-               mtCacheMgrRequest, mtCacheMgrResponse
-#if SQUID_SNMP
-               ,
-               mtSnmpRequest, mtSnmpResponse
-#endif
+
+               // Cache Manager aand SNMP agent messages
+               mtCacheMgrRequest, mtCacheMgrResponse,
+               mtSnmpRequest, mtSnmpResponse,
+
+               mtLastEntry // dummy (end of list)
              } MessageType;
 
+/// Currently subscribed listeners
+extern Subscription::Pointer MessageSubscriptions[mtLastEntry];
+
+/// subscribe a callback to happen whenever a certain message class arrives
+inline void RegisterListenFor(MessageType m, const Subscription::Pointer &callSub)
+{
+    assert(MessageSubscriptions[m] == NULL);
+    MessageSubscriptions[m] = callSub;
+}
+
+/// stop listening for message of the given type.
+/// already scheduled events may exist and will happen
+/// but future messages will be dropped.
+inline void ClearListenFor(MessageType m)
+{
+    MessageSubscriptions[m] = NULL;
+}
+
 } // namespace Ipc;
 
-
 #endif /* SQUID_IPC_MESSAGES_H */

=== modified file 'src/ipc/TypedMsgHdr.h'
--- src/ipc/TypedMsgHdr.h	2010-11-21 04:40:05 +0000
+++ src/ipc/TypedMsgHdr.h	2011-08-13 03:19:17 +0000
@@ -8,6 +8,9 @@
 #ifndef SQUID_IPC_TYPED_MSG_HDR_H
 #define SQUID_IPC_TYPED_MSG_HDR_H
 
+#if HAVE_OSTREAM
+#include <ostream>
+#endif
 #if HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
 #endif
@@ -99,4 +102,10 @@
 
 } // namespace Ipc
 
+inline std::ostream &
+operator <<(std::ostream &o, const Ipc::TypedMsgHdr &m)
+{
+    return o << "IPC message [type=" << m.type() << "]";
+}
+
 #endif /* SQUID_IPC_TYPED_MSG_HDR_H */

Reply via email to