I have a problem getting the connection working with RabbitMQ:
I host the RabbitMQ on the same server the apex application is running.
+--------------------+---------+
| name | type |
+--------------------+---------+
| apex | fanout |
+--------------------+---------+
+------+----------+
| name | messages |
+------+----------+
| task | 31 |
+------+----------+
In the program for test issues I declare it this way:
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
in.setHost("localhost");
in.setExchange("apex");
in.setExchangeType("fanout");
in.setQueueName("task");
ConsoleOutputOperator console = dag.addOperator("console", new
ConsoleOutputOperator());
dag.addStream("rand_console",in.outputPort, console.input);
}
But a look at the operators shows that it does not fetch any messages:
{
"id": "1",
"name": "rabbitInput",
"className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
"container": null,
"host": null,
"totalTuplesProcessed": "0",
"totalTuplesEmitted": "0",
"tuplesProcessedPSMA": "0",
"tuplesEmittedPSMA": "0",
"cpuPercentageMA": "0.0",
"latencyMA": "0",
"status": "PENDING_DEPLOY",
"lastHeartbeat": "0",
"failureCount": "0",
"recoveryWindowId": "0",
"currentWindowId": "0",
"ports": [],
"unifierClass": null,
"logicalName": "rabbitInput",
"recordingId": null,
"counters": null,
"metrics": null,
"checkpointStartTime": "0",
"checkpointTime": "0",
"checkpointTimeMA": "0"
},
What am I doing wrong here? Since i can configure the RAbbitMQ side is
there a preferred way of configuration for apex?
Cheers
Manfred.