Hello,

  I'm set up user defined messaging to be able to send command from a spectator 
manager to my cluster partition.
The message are sent with sendAndWait and I defined a custom callback class. 
The problem is that if messages are
correctly sent to all partitions, there is absolutely no return received and 
therefore sendAndWait always timeout (10s, which is utterly sufficient).

I am using helix 0.8.2 and all tests are made on a standalone instance. Someone 
has an idea of what I am doing wrong ?

Thanks !

---------------------------------

Here are the relevant parts of the source code and a log :

The message handler :

---8<---
class CustomMessageHandler extends MessageHandler {

  CustomMessageHandler(Message message,
                       NotificationContext context) {
    super(message, context);
  }

  @Override
  public HelixTaskResult handleMessage() {
    LOGGER.info("handleMessage");
    assert(_message != null);
    String msgSubType = getMessage().getMsgSubType();

    LOGGER.info("callback message : {}", _message.toString());

    // do some stuff

    HelixTaskResult result = new HelixTaskResult();
    result.getTaskResultMap().put("PARTITION_NAME", 
getMessage().getPartitionName());
    result.getTaskResultMap().put("ACTION", msgSubType);
    result.setSuccess(true);

    return result;
  }

  @Override
  public void onError(Exception e, ErrorCode code, ErrorType type) {
    LOGGER.error("Error while processing command: ", e);
  }
}
---8<---


Registration of the message handler to a participant:
---8<---
CustomMessageHandlerFactory messageHandlerFactory = new 
CustomMessageHandlerFactory();
messagingService.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.name(),
 messageHandlerFactory);
---8<---


The callbackOnReply :
---8<---
public static class ControllerMessageCallBack extends AsyncCallback
{
  public synchronized final void onReplyMessage(Message message) {
    LOGGER.info("onReplyMessage {} => {}", message.getResourceName(), 
message.toString());
  }
  public synchronized final void onTimeOut() {
    LOGGER.info("onTimeOut: nb messages replied: {}", _messageReplied.size());
    for (Message m :_messageReplied)
      LOGGER.info("onTimeOut: REPLIED {} ", m);
  }
}
---8<---


And finally to send a message :

---8<---
ClusterMessagingService messagingService = 
distributionManagerSpectator.getMessagingService();

// Construct the Message
Message request = new Message(Message.MessageType.USER_DEFINE_MSG, 
UUID.randomUUID().toString());
request.setMsgSubType(action.name());
request.setMsgState(Message.MessageState.NEW);

// Set the Recipient criteria: all nodes that satisfy the criteria will receive 
the message
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setPartition("%");

// Should be processed only by process(es) that are active at the time of 
sending the message
// This means if the recipient is restarted after message is sent, it will not 
be processe.
recipientCriteria.setSessionSpecific(true);

// the handler that will be invoked when any recipient responds to the message.
CustomMessageHandlerFactory.ControllerMessageCallBack responseHandler =
  new CustomMessageHandlerFactory.ControllerMessageCallBack();

// this will return only after all recipients respond or after timeout
int timeout = 10000;
int sentMessageCount = messagingService.sendAndWait(recipientCriteria, request, 
responseHandler, timeout);
---8<---


INF|06/083625.845 c.e.w.c.HelixConfiguration@main Helix: localhost:2199 
cluster: standalone ressourceName: crawlDB
WAR|06/083625.849 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR [initializing]
WAR|06/083625.849 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [creating]
INF|06/083625.856 o.a.h.m.z.ZKHelixManager@main Create a zk-based cluster 
manager. zkSvr: localhost:2199, clusterName: standalone, instanceName: 
localhost-CMD, type: SPECTATOR
INF|06/083625.858 o.a.h.HelixManagerProperties@main load helix-manager 
properties: {minimum_supported_version.batch_message=0.6.1, 
clustermanager.version=0.8.2, minimum_supported_version.participant=0.4}
INF|06/083625.936 o.a.h.m.h.HelixTaskExecutor@main Registered message handler 
factory for type: TASK_REPLY, poolSize: 40, factory: 
org.apache.helix.messaging.handling.AsyncCallbackService@6950e31, pool: 
java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool size = 0, active 
threads = 0, queued tasks = 0, completed tasks = 0]
WAR|06/083625.959 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [created]
WAR|06/083625.959 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [initializing]
WAR|06/083625.959 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [initialized]
WAR|06/083625.959 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [connecting]
INF|06/083625.959 o.a.h.m.z.ZKHelixManager@main ClusterManager.connect()
DEB|06/083625.968 o.I.z.ZkConnection@main Creating new ZookKeeper instance to 
connect to localhost:2199.
INF|06/083625.968 o.a.h.m.z.z.ZkClient@3-localhost:2199 Starting ZkClient event 
thread.
DEB|06/083625.983 o.a.h.m.z.z.ZkClient@main Awaiting connection to Zookeeper 
server
DEB|06/083625.983 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.070 o.a.h.m.z.z.ZkClient@main-EventThread Received event: 
WatchedEvent state:SyncConnected type:None path:null
INF|06/083626.071 o.a.h.m.z.z.ZkClient@main-EventThread zookeeper state changed 
(SyncConnected)
DEB|06/083626.071 o.a.h.m.z.z.ZkClient@main-EventThread Leaving process event
DEB|06/083626.071 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.088 o.a.h.m.z.ZKHelixManager@main KeeperState: SyncConnected, 
instance: localhost-CMD, type: SPECTATOR, zookeeper:State:CONNECTED 
Timeout:30000 sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432 
remoteserver:localhost/127.0.0.1:2199 lastZxid:0 xid:1 sent:1 recv:1 
queuedpkts:0 pendingresp:0 queuedevents:0
INF|06/083626.088 o.a.h.m.z.ZKHelixManager@main Handle new session, instance: 
localhost-CMD, type: SPECTATOR
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.089 o.a.h.m.z.ZKHelixManager@main Handling new session, session 
id: 10043eabe2e0009, instance: localhost-CMD, instanceTye: SPECTATOR, cluster: 
standalone, zkconnection: State:CONNECTED Timeout:30000 
sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432 
remoteserver:localhost/127.0.0.1:2199 lastZxid:0 xid:1 sent:1 recv:1 
queuedpkts:0 pendingresp:0 queuedevents:0
WAR|06/083626.108 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [connected]
WAR|06/083626.108 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR [initialized]
INF|06/083626.126 o.a.h.m.AsyncCallback@main Setting time out to -1 ms
INF|06/083626.126 c.e.w.c.ControllerMessage@main sent a message 
9b88c7fd-8c5e-4aef-9459-81ac9a3d931e, {CREATE_TIMESTAMP=1575617786125, 
MSG_ID=9b88c7fd-8c5e-4aef-9459-81ac9a3d931e, MSG_STATE=new, MSG_SUBTYPE=FLUSH, 
MSG_TYPE=USER_DEFINE_MSG}{}{}
DEB|06/083626.126 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.126 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.172 o.a.h.m.CriteriaEvaluator@main Query returned 20 rows
DEB|06/083626.173 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.173 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.177 o.a.h.m.DefaultMessagingService@main Send 20 messages with 
criteria instanceName=%resourceName=partitionName=%partitionState=
INF|06/083626.178 o.a.h.m.AsyncCallback@main Setting time out to 10000 ms
INF|06/083626.178 o.a.h.m.h.AsyncCallbackService@main registering correlation 
id 806daf51-5216-4f6c-beaf-3e8ea42e4b5e
DEB|06/083626.178 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.178 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.192 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.192 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.194 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.194 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.197 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.197 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.201 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.201 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.204 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.204 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.214 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.214 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.217 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.217 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.219 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.220 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.230 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.230 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.241 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.242 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.267 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.267 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.284 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.285 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.294 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.294 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.298 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.298 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.317 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.317 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.335 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.335 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.347 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.347 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.350 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.350 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.360 o.a.h.m.z.z.ZkClient@main Waiting for keeper state 
SyncConnected
DEB|06/083626.360 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083636.364 c.e.w.d.z.c.CustomMessageHandlerFactory@Timer-1 onTimeOut: nb 
messages replied: 0
INF|06/083636.365 c.e.w.c.ControllerMessage@main sentMessageCount: 20
WAR|06/083636.365 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR [closing]
WAR|06/083636.365 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [disconnecting]
INF|06/083636.365 o.a.h.m.z.ZKHelixManager@main disconnect 
localhost-CMD(SPECTATOR) from standalone
INF|06/083636.365 o.a.h.m.h.HelixTaskExecutor@main Shutting down 
HelixTaskExecutor
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Reset HelixTaskExecutor
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Reset exectuor for msgType: 
TASK_REPLY, pool: java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool 
size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Shutting down pool: 
java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool size = 0, active 
threads = 0, queued tasks = 0, completed tasks = 0]
INF|06/083636.367 o.a.h.m.h.HelixTaskExecutor@main
INF|06/083636.367 o.a.h.m.h.HelixTaskExecutor@main Shutdown HelixTaskExecutor 
finished
INF|06/083636.368 o.a.h.m.z.z.ZkClient@main Closing zkclient: State:CONNECTED 
Timeout:30000 sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432 
remoteserver:localhost/127.0.0.1:2199 lastZxid:581 xid:61 sent:62 recv:62 
queuedpkts:0 pendingresp:0 queuedevents:0
INF|06/083636.368 o.a.h.m.z.z.ZkClient@3-localhost:2199 Terminate ZkClient 
event thread.
INF|06/083636.369 o.a.h.m.z.z.ZkClient@3-localhost:2199 Terminate ZkClient 
event thread.
DEB|06/083636.369 o.I.z.ZkConnection@main Closing ZooKeeper connected to 
localhost:2199
INF|06/083636.373 o.a.h.m.z.z.ZkClient@main Closed zkclient
INF|06/083636.373 o.a.h.m.z.ZKHelixManager@main Cluster manager: localhost-CMD 
disconnected
WAR|06/083636.373 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [disconnected]
WAR|06/083636.373 c.e.w.d.DistributionManager@main 
localhost-CMD/standalone/crawlDB/SPECTATOR [closed]


Reply via email to