This migrates Cache Manager to using the new IPC subscription API for
handling message events.
To do this the CacheManager is converted to an AsyncJob which runs from
the point of first action registration until shutdown. The existing
event handlers are move into its scope as-is.
Requires the IPC subscriptions patch previously submitted.
FUTURE WORK:
Migrate the SNMP messaging as well. I'm not sure if we want to do
that before, after or as part of merging the SNMP and mgr backend
message actions.
Amos
--
Please be using
Current Stable Squid 2.7.STABLE9 or 3.1.14
Beta testers wanted for 3.2.0.10
=== modified file 'src/CacheManager.h'
--- src/CacheManager.h 2010-11-04 10:09:05 +0000
+++ src/CacheManager.h 2011-08-13 03:41:21 +0000
@@ -34,6 +34,7 @@
#ifndef SQUID_CACHEMANAGER_H
#define SQUID_CACHEMANAGER_H
+#include "base/AsyncJob.h"
#include "comm/forward.h"
#include "mgr/Action.h"
#include "mgr/ActionProfile.h"
@@ -41,6 +42,11 @@
#include "mgr/forward.h"
#include <vector>
+namespace Ipc
+{
+ class TypedMsgHdr;
+}
+
/**
\defgroup CacheManagerAPI Cache Manager API
\ingroup Components
@@ -57,7 +63,7 @@
* an instance of this class will represent a single independent manager.
* TODO: update documentation to reflect the new singleton model.
*/
-class CacheManager
+class CacheManager : public AsyncJob
{
public:
typedef std::vector<Mgr::ActionProfilePointer> Menu;
@@ -78,8 +84,12 @@
static CacheManager* GetInstance();
const char *ActionProtection(const Mgr::ActionProfilePointer &profile);
+ // AsyncJob API
+ virtual void swanSong();
+ virtual bool doneAll() const;
+
protected:
- CacheManager() {} ///< use Instance() instead
+ CacheManager() : AsyncJob("CacheManager") {} ///< use Instance() instead
Mgr::CommandPointer ParseUrl(const char *url);
void ParseHeaders(const HttpRequest * request, Mgr::ActionParams ¶ms);
@@ -88,8 +98,13 @@
void registerProfile(const Mgr::ActionProfilePointer &profile);
+ // IPC subscriptions
+ void handleIpcRequest(const Ipc::TypedMsgHdr &msg);
+ void handleIpcResponse(const Ipc::TypedMsgHdr &msg);
+
Menu menu_;
+ CBDATA_CLASS2(CacheManager);
private:
static CacheManager* instance;
};
=== modified file 'src/cache_manager.cc'
--- src/cache_manager.cc 2011-07-23 08:37:52 +0000
+++ src/cache_manager.cc 2011-08-13 11:47:45 +0000
@@ -42,6 +42,11 @@
#include "fde.h"
#include "HttpReply.h"
#include "HttpRequest.h"
+#include "ipc/Coordinator.h"
+#include "ipc/Messages.h"
+#include "ipc/Port.h"
+#include "ipc/TypedMsgHdr.h"
+#include "ipc/UdsOp.h"
#include "mgr/ActionCreator.h"
#include "mgr/Action.h"
#include "mgr/ActionProfile.h"
@@ -49,7 +54,10 @@
#include "mgr/Command.h"
#include "mgr/Forwarder.h"
#include "mgr/FunAction.h"
+#include "mgr/Inquirer.h"
#include "mgr/QueryParams.h"
+#include "mgr/Request.h"
+#include "mgr/Response.h"
#include "protos.h" /* rotate_logs() */
#include "SquidTime.h"
#include "Store.h"
@@ -60,6 +68,8 @@
/// \ingroup CacheManagerInternal
#define MGR_PASSWD_SZ 128
+CBDATA_CLASS_INIT(CacheManager);
+
/// creates Action using supplied Action::Create method and command
class ClassActionCreator: public Mgr::ActionCreator
{
@@ -456,6 +466,20 @@
CacheManager* CacheManager::instance=0;
+void
+CacheManager::swanSong()
+{
+ Ipc::ClearListenFor(Ipc::mtCacheMgrRequest);
+ Ipc::ClearListenFor(Ipc::mtCacheMgrResponse);
+}
+
+bool
+CacheManager::doneAll() const
+{
+ // Once started, only exit on shutdown.
+ return shutting_down != 0;
+}
+
/**
\ingroup CacheManagerAPI
* Singleton accessor method.
@@ -467,6 +491,54 @@
debugs(16, 6, "CacheManager::GetInstance: starting cachemanager up");
instance = new CacheManager;
Mgr::RegisterBasics();
+
+ // NP: no need to hold onto these. Since we un-register using the message type.
+ typedef UnaryMemFunT<CacheManager, const Ipc::TypedMsgHdr, const Ipc::TypedMsgHdr&> MgrDialer;
+ typedef AsyncCallT<MgrDialer> IpcMsgCall;
+ Subscription::Pointer sub;
+ RefCount<IpcMsgCall> call;
+
+ // Start accepting Cache Manager Responses
+ call = static_cast<IpcMsgCall*>(asyncCall(16, 3, "CacheManager::handleIpcResponse",
+ MgrDialer(instance, &CacheManager::handleIpcResponse, Ipc::TypedMsgHdr())));
+ sub = new CallSubscription<IpcMsgCall>(call);
+ Ipc::RegisterListenFor(Ipc::mtCacheMgrResponse, sub);
+ debugs(16, 1, "Startup: CacheManager: Accepting manager IPC responses");
+
+ // Start accepting Cache Manager Requests
+ call = static_cast<IpcMsgCall*>(asyncCall(16, 3, "CacheManager::handleIpcRequest",
+ MgrDialer(instance, &CacheManager::handleIpcRequest, Ipc::TypedMsgHdr())));
+ sub = new CallSubscription<IpcMsgCall>(call);
+ Ipc::RegisterListenFor(Ipc::mtCacheMgrRequest, sub);
+
+ debugs(16, 1, "Startup: CacheManager: Accepting manager IPC requests");
}
return instance;
}
+
+void
+CacheManager::handleIpcRequest(const Ipc::TypedMsgHdr &msg)
+{
+ debugs(16, 4, HERE);
+ const Mgr::Request request(msg);
+
+ // Let the strand know that we are now responsible for handling the request
+ Mgr::Response response(request.requestId);
+ Ipc::TypedMsgHdr message;
+ response.pack(message);
+ Ipc::SendMessage(Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId), message);
+
+ // XXX: Just run the action? we are inside an async manager call already.
+ Mgr::Action::Pointer action = createRequestedAction(request.params);
+ if (Ipc::Coordinator::Instance())
+ AsyncJob::Start(new Mgr::Inquirer(action, request, Ipc::Coordinator::Instance()->strands()));
+ // else throw? coordinator scheduled a call and abandoned us.
+}
+
+void
+CacheManager::handleIpcResponse(const Ipc::TypedMsgHdr &msg)
+{
+ debugs(16, 4, HERE);
+ const Mgr::Response response(msg);
+ Mgr::Inquirer::HandleRemoteAck(response);
+}
=== modified file 'src/ipc/Coordinator.cc'
--- src/ipc/Coordinator.cc 2011-07-20 12:38:39 +0000
+++ src/ipc/Coordinator.cc 2011-08-13 07:21:31 +0000
@@ -9,14 +9,10 @@
#include "config.h"
#include "base/Subscription.h"
#include "base/TextException.h"
-#include "CacheManager.h"
#include "comm.h"
#include "comm/Connection.h"
#include "ipc/Coordinator.h"
#include "ipc/SharedListen.h"
-#include "mgr/Inquirer.h"
-#include "mgr/Request.h"
-#include "mgr/Response.h"
#if SQUID_SNMP
#include "snmp/Inquirer.h"
#include "snmp/Request.h"
@@ -68,20 +74,6 @@
handleSharedListenRequest(SharedListenRequest(message));
break;
- case mtCacheMgrRequest: {
- debugs(54, 6, HERE << "Cache manager request");
- const Mgr::Request req(message);
- handleCacheMgrRequest(req);
- }
- break;
-
- case mtCacheMgrResponse: {
- debugs(54, 6, HERE << "Cache manager response");
- const Mgr::Response resp(message);
- handleCacheMgrResponse(resp);
- }
- break;
-
#if SQUID_SNMP
case mtSnmpRequest: {
debugs(54, 6, HERE << "SNMP request");
@@ -134,28 +126,6 @@
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
}
-void
-Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
-{
- debugs(54, 4, HERE);
-
- // Let the strand know that we are now responsible for handling the request
- Mgr::Response response(request.requestId);
- TypedMsgHdr message;
- response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
-
- Mgr::Action::Pointer action =
- CacheManager::GetInstance()->createRequestedAction(request.params);
- AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
-}
-
-void
-Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
-{
- Mgr::Inquirer::HandleRemoteAck(response);
-}
-
#if SQUID_SNMP
void
Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
=== modified file 'src/mgr/Request.cc'
--- src/mgr/Request.cc 2011-05-13 08:13:01 +0000
+++ src/mgr/Request.cc 2011-08-11 17:27:32 +0000
@@ -32,6 +32,12 @@
Mgr::Request::Request(const Ipc::TypedMsgHdr& msg):
Ipc::Request(0, 0)
{
+ *this = msg;
+}
+
+Mgr::Request&
+Mgr::Request::operator =(const Ipc::TypedMsgHdr& msg)
+{
msg.checkType(Ipc::mtCacheMgrRequest);
msg.getPod(requestorId);
msg.getPod(requestId);
@@ -41,6 +47,7 @@
conn->fd = msg.getFd();
// For now we just have the FD.
// Address and connectio details wil be pulled/imported by the component later
+ return *this;
}
void
=== modified file 'src/mgr/Request.h'
--- src/mgr/Request.h 2011-05-13 08:13:01 +0000
+++ src/mgr/Request.h 2011-08-11 17:10:11 +0000
@@ -24,6 +24,8 @@
const ActionParams &aParams);
explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
+ Request & operator =(const Ipc::TypedMsgHdr& msg);
+
/* Ipc::Request API */
virtual void pack(Ipc::TypedMsgHdr& msg) const;
virtual Pointer clone() const;