Hello,
I'm set up user defined messaging to be able to send command from a spectator
manager to my cluster partition.
The message are sent with sendAndWait and I defined a custom callback class.
The problem is that if messages are
correctly sent to all partitions, there is absolutely no return received and
therefore sendAndWait always timeout (10s, which is utterly sufficient).
I am using helix 0.8.2 and all tests are made on a standalone instance. Someone
has an idea of what I am doing wrong ?
Thanks !
---------------------------------
Here are the relevant parts of the source code and a log :
The message handler :
---8<---
class CustomMessageHandler extends MessageHandler {
CustomMessageHandler(Message message,
NotificationContext context) {
super(message, context);
}
@Override
public HelixTaskResult handleMessage() {
LOGGER.info("handleMessage");
assert(_message != null);
String msgSubType = getMessage().getMsgSubType();
LOGGER.info("callback message : {}", _message.toString());
// do some stuff
HelixTaskResult result = new HelixTaskResult();
result.getTaskResultMap().put("PARTITION_NAME",
getMessage().getPartitionName());
result.getTaskResultMap().put("ACTION", msgSubType);
result.setSuccess(true);
return result;
}
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
LOGGER.error("Error while processing command: ", e);
}
}
---8<---
Registration of the message handler to a participant:
---8<---
CustomMessageHandlerFactory messageHandlerFactory = new
CustomMessageHandlerFactory();
messagingService.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.name(),
messageHandlerFactory);
---8<---
The callbackOnReply :
---8<---
public static class ControllerMessageCallBack extends AsyncCallback
{
public synchronized final void onReplyMessage(Message message) {
LOGGER.info("onReplyMessage {} => {}", message.getResourceName(),
message.toString());
}
public synchronized final void onTimeOut() {
LOGGER.info("onTimeOut: nb messages replied: {}", _messageReplied.size());
for (Message m :_messageReplied)
LOGGER.info("onTimeOut: REPLIED {} ", m);
}
}
---8<---
And finally to send a message :
---8<---
ClusterMessagingService messagingService =
distributionManagerSpectator.getMessagingService();
// Construct the Message
Message request = new Message(Message.MessageType.USER_DEFINE_MSG,
UUID.randomUUID().toString());
request.setMsgSubType(action.name());
request.setMsgState(Message.MessageState.NEW);
// Set the Recipient criteria: all nodes that satisfy the criteria will receive
the message
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setPartition("%");
// Should be processed only by process(es) that are active at the time of
sending the message
// This means if the recipient is restarted after message is sent, it will not
be processe.
recipientCriteria.setSessionSpecific(true);
// the handler that will be invoked when any recipient responds to the message.
CustomMessageHandlerFactory.ControllerMessageCallBack responseHandler =
new CustomMessageHandlerFactory.ControllerMessageCallBack();
// this will return only after all recipients respond or after timeout
int timeout = 10000;
int sentMessageCount = messagingService.sendAndWait(recipientCriteria, request,
responseHandler, timeout);
---8<---
INF|06/083625.845 c.e.w.c.HelixConfiguration@main Helix: localhost:2199
cluster: standalone ressourceName: crawlDB
WAR|06/083625.849 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR [initializing]
WAR|06/083625.849 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [creating]
INF|06/083625.856 o.a.h.m.z.ZKHelixManager@main Create a zk-based cluster
manager. zkSvr: localhost:2199, clusterName: standalone, instanceName:
localhost-CMD, type: SPECTATOR
INF|06/083625.858 o.a.h.HelixManagerProperties@main load helix-manager
properties: {minimum_supported_version.batch_message=0.6.1,
clustermanager.version=0.8.2, minimum_supported_version.participant=0.4}
INF|06/083625.936 o.a.h.m.h.HelixTaskExecutor@main Registered message handler
factory for type: TASK_REPLY, poolSize: 40, factory:
org.apache.helix.messaging.handling.AsyncCallbackService@6950e31, pool:
java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool size = 0, active
threads = 0, queued tasks = 0, completed tasks = 0]
WAR|06/083625.959 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [created]
WAR|06/083625.959 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [initializing]
WAR|06/083625.959 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [initialized]
WAR|06/083625.959 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [connecting]
INF|06/083625.959 o.a.h.m.z.ZKHelixManager@main ClusterManager.connect()
DEB|06/083625.968 o.I.z.ZkConnection@main Creating new ZookKeeper instance to
connect to localhost:2199.
INF|06/083625.968 o.a.h.m.z.z.ZkClient@3-localhost:2199 Starting ZkClient event
thread.
DEB|06/083625.983 o.a.h.m.z.z.ZkClient@main Awaiting connection to Zookeeper
server
DEB|06/083625.983 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.070 o.a.h.m.z.z.ZkClient@main-EventThread Received event:
WatchedEvent state:SyncConnected type:None path:null
INF|06/083626.071 o.a.h.m.z.z.ZkClient@main-EventThread zookeeper state changed
(SyncConnected)
DEB|06/083626.071 o.a.h.m.z.z.ZkClient@main-EventThread Leaving process event
DEB|06/083626.071 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.088 o.a.h.m.z.ZKHelixManager@main KeeperState: SyncConnected,
instance: localhost-CMD, type: SPECTATOR, zookeeper:State:CONNECTED
Timeout:30000 sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432
remoteserver:localhost/127.0.0.1:2199 lastZxid:0 xid:1 sent:1 recv:1
queuedpkts:0 pendingresp:0 queuedevents:0
INF|06/083626.088 o.a.h.m.z.ZKHelixManager@main Handle new session, instance:
localhost-CMD, type: SPECTATOR
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.089 o.a.h.m.z.ZKHelixManager@main Handling new session, session
id: 10043eabe2e0009, instance: localhost-CMD, instanceTye: SPECTATOR, cluster:
standalone, zkconnection: State:CONNECTED Timeout:30000
sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432
remoteserver:localhost/127.0.0.1:2199 lastZxid:0 xid:1 sent:1 recv:1
queuedpkts:0 pendingresp:0 queuedevents:0
WAR|06/083626.108 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [connected]
WAR|06/083626.108 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR [initialized]
INF|06/083626.126 o.a.h.m.AsyncCallback@main Setting time out to -1 ms
INF|06/083626.126 c.e.w.c.ControllerMessage@main sent a message
9b88c7fd-8c5e-4aef-9459-81ac9a3d931e, {CREATE_TIMESTAMP=1575617786125,
MSG_ID=9b88c7fd-8c5e-4aef-9459-81ac9a3d931e, MSG_STATE=new, MSG_SUBTYPE=FLUSH,
MSG_TYPE=USER_DEFINE_MSG}{}{}
DEB|06/083626.126 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.126 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.172 o.a.h.m.CriteriaEvaluator@main Query returned 20 rows
DEB|06/083626.173 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.173 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.177 o.a.h.m.DefaultMessagingService@main Send 20 messages with
criteria instanceName=%resourceName=partitionName=%partitionState=
INF|06/083626.178 o.a.h.m.AsyncCallback@main Setting time out to 10000 ms
INF|06/083626.178 o.a.h.m.h.AsyncCallbackService@main registering correlation
id 806daf51-5216-4f6c-beaf-3e8ea42e4b5e
DEB|06/083626.178 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.178 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.192 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.192 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.194 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.194 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.197 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.197 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.201 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.201 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.204 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.204 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.214 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.214 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.217 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.217 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.219 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.220 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.230 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.230 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.241 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.242 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.267 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.267 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.284 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.285 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.294 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.294 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.298 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.298 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.317 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.317 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.335 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.335 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.347 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.347 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.350 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.350 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.360 o.a.h.m.z.z.ZkClient@main Waiting for keeper state
SyncConnected
DEB|06/083626.360 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083636.364 c.e.w.d.z.c.CustomMessageHandlerFactory@Timer-1 onTimeOut: nb
messages replied: 0
INF|06/083636.365 c.e.w.c.ControllerMessage@main sentMessageCount: 20
WAR|06/083636.365 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR [closing]
WAR|06/083636.365 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [disconnecting]
INF|06/083636.365 o.a.h.m.z.ZKHelixManager@main disconnect
localhost-CMD(SPECTATOR) from standalone
INF|06/083636.365 o.a.h.m.h.HelixTaskExecutor@main Shutting down
HelixTaskExecutor
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Reset HelixTaskExecutor
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Reset exectuor for msgType:
TASK_REPLY, pool: java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool
size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Shutting down pool:
java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool size = 0, active
threads = 0, queued tasks = 0, completed tasks = 0]
INF|06/083636.367 o.a.h.m.h.HelixTaskExecutor@main
INF|06/083636.367 o.a.h.m.h.HelixTaskExecutor@main Shutdown HelixTaskExecutor
finished
INF|06/083636.368 o.a.h.m.z.z.ZkClient@main Closing zkclient: State:CONNECTED
Timeout:30000 sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432
remoteserver:localhost/127.0.0.1:2199 lastZxid:581 xid:61 sent:62 recv:62
queuedpkts:0 pendingresp:0 queuedevents:0
INF|06/083636.368 o.a.h.m.z.z.ZkClient@3-localhost:2199 Terminate ZkClient
event thread.
INF|06/083636.369 o.a.h.m.z.z.ZkClient@3-localhost:2199 Terminate ZkClient
event thread.
DEB|06/083636.369 o.I.z.ZkConnection@main Closing ZooKeeper connected to
localhost:2199
INF|06/083636.373 o.a.h.m.z.z.ZkClient@main Closed zkclient
INF|06/083636.373 o.a.h.m.z.ZKHelixManager@main Cluster manager: localhost-CMD
disconnected
WAR|06/083636.373 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [disconnected]
WAR|06/083636.373 c.e.w.d.DistributionManager@main
localhost-CMD/standalone/crawlDB/SPECTATOR [closed]