On 13/08/11 05:23, Amos Jeffries wrote:
This adds support for components to register to receive AsyncCalls when
IPC messages are received.
The subscription is linked to a message type. With currently one
registered recipient Job per type.
Updated version. Includes a new file ipc/Messages.cc to define the
subscriptions registry global.
Now tested and works.
Amos
--
Please be using
Current Stable Squid 2.7.STABLE9 or 3.1.14
Beta testers wanted for 3.2.0.10
=== modified file 'src/ipc/Coordinator.cc'
--- src/ipc/Coordinator.cc 2011-07-20 12:38:39 +0000
+++ src/ipc/Coordinator.cc 2011-08-13 07:21:31 +0000
@@ -57,6 +53,16 @@
void Ipc::Coordinator::receive(const TypedMsgHdr& message)
{
+ if (MessageSubscriptions[message.type()] != NULL) {
+ typedef UnaryMemFunT<AsyncJob, TypedMsgHdr> Dialer;
+ AsyncCall::Pointer call = MessageSubscriptions[message.type()]->callback();
+ Dialer *d = dynamic_cast<Dialer *>(call->getDialer());
+ assert(d);
+ d->arg1 = message;
+ ScheduleCallHere(call);
+ return;
+ }
+
switch (message.type()) {
case mtRegistration:
debugs(54, 6, HERE << "Registration request");
=== modified file 'src/ipc/Makefile.am'
--- src/ipc/Makefile.am 2011-07-20 09:24:15 +0000
+++ src/ipc/Makefile.am 2011-08-13 03:58:29 +0000
@@ -10,6 +10,7 @@
Kid.h \
Kids.cc \
Kids.h \
+ Messages.cc \
Messages.h \
StartListening.cc \
StartListening.h \
=== added file 'src/ipc/Messages.cc'
--- src/ipc/Messages.cc 1970-01-01 00:00:00 +0000
+++ src/ipc/Messages.cc 2011-08-13 07:33:51 +0000
@@ -0,0 +1,4 @@
+#include "config.h"
+#include "ipc/Messages.h"
+
+Subscription::Pointer Ipc::MessageSubscriptions[Ipc::mtLastEntry];
=== modified file 'src/ipc/Messages.h'
--- src/ipc/Messages.h 2011-02-06 19:50:52 +0000
+++ src/ipc/Messages.h 2011-08-13 10:13:19 +0000
@@ -8,22 +8,52 @@
#ifndef SQUID_IPC_MESSAGES_H
#define SQUID_IPC_MESSAGES_H
-/** Declarations used by various IPC messages */
+//#include "base/AsyncJobCalls.h"
+#include "base/Subscription.h"
+
+/*
+ * Messaging and Message Subscriptions.
+ *
+ * Subscribers must create a Dialer and a AsyncCall to receieve TypedMsgHdr parameters
+ * then generate a new CallSubscription<Dialer>(AsyncCall).
+ * then use RegisterListenFor(message type, subscription) to start receiving IPC messages
+ */
namespace Ipc
{
/// message class identifier
-typedef enum { mtNone = 0, mtRegistration,
+typedef enum { mtNone = 0,
+
+ // IPC worker registration messages
+ mtRegistration,
mtSharedListenRequest, mtSharedListenResponse,
- mtCacheMgrRequest, mtCacheMgrResponse
-#if SQUID_SNMP
- ,
- mtSnmpRequest, mtSnmpResponse
-#endif
+
+ // Cache Manager aand SNMP agent messages
+ mtCacheMgrRequest, mtCacheMgrResponse,
+ mtSnmpRequest, mtSnmpResponse,
+
+ mtLastEntry // dummy (end of list)
} MessageType;
+/// Currently subscribed listeners
+extern Subscription::Pointer MessageSubscriptions[mtLastEntry];
+
+/// subscribe a callback to happen whenever a certain message class arrives
+inline void RegisterListenFor(MessageType m, const Subscription::Pointer &callSub)
+{
+ assert(MessageSubscriptions[m] == NULL);
+ MessageSubscriptions[m] = callSub;
+}
+
+/// stop listening for message of the given type.
+/// already scheduled events may exist and will happen
+/// but future messages will be dropped.
+inline void ClearListenFor(MessageType m)
+{
+ MessageSubscriptions[m] = NULL;
+}
+
} // namespace Ipc;
-
#endif /* SQUID_IPC_MESSAGES_H */
=== modified file 'src/ipc/TypedMsgHdr.h'
--- src/ipc/TypedMsgHdr.h 2010-11-21 04:40:05 +0000
+++ src/ipc/TypedMsgHdr.h 2011-08-13 03:19:17 +0000
@@ -8,6 +8,9 @@
#ifndef SQUID_IPC_TYPED_MSG_HDR_H
#define SQUID_IPC_TYPED_MSG_HDR_H
+#if HAVE_OSTREAM
+#include <ostream>
+#endif
#if HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
@@ -99,4 +102,10 @@
} // namespace Ipc
+inline std::ostream &
+operator <<(std::ostream &o, const Ipc::TypedMsgHdr &m)
+{
+ return o << "IPC message [type=" << m.type() << "]";
+}
+
#endif /* SQUID_IPC_TYPED_MSG_HDR_H */