If you have used any routing_key , please specify that using in.setRoutingKey() . I dont see that one in your code.
On Mon, Jun 26, 2017 at 9:00 PM, <[email protected]> wrote: > I get no exception in the apex.log and yes the queue is durable. > > vhost: / > name: task > auto_delete: False > backing_queue_status.avg_ack_egress_rate: 0.0 > backing_queue_status.avg_ack_ingress_rate: 0.0 > backing_queue_status.avg_egress_rate: 0.0 > backing_queue_status.avg_ingress_rate: 0.5866956420847993 > backing_queue_status.delta: ["delta", "undefined", 0, 0, > "undefined"] > backing_queue_status.len: 31 > backing_queue_status.mode: default > backing_queue_status.next_seq_id: 31 > backing_queue_status.q1: 0 > backing_queue_status.q2: 0 > backing_queue_status.q3: 0 > backing_queue_status.q4: 31 > backing_queue_status.target_ram_count: infinity > consumer_utilisation: None > consumers: 0 > durable: True > exclusive: False > > The goal here is to connect to the RabbitMQ and fetch messages and write > them to the console. I send the messages via a script or directly via the > rabbitmqadmin console. Any Ideas why the program does not read from the > rabbitmq? > > Cheers Manfred. > > > > Am 26.06.2017 um 17:14 schrieb vikram patil: > > Hi Manfred, > > Are you getting any exception in the logs ? Check if your queue is > durable. > > Thanks & Regards, > Vikram > > On Mon, Jun 26, 2017 at 8:37 PM, <[email protected]> wrote: > >> I have a problem getting the connection working with RabbitMQ: >> >> I host the RabbitMQ on the same server the apex application is running. >> >> +--------------------+---------+ >> | name | type | >> +--------------------+---------+ >> | apex | fanout | >> +--------------------+---------+ >> >> +------+----------+ >> | name | messages | >> +------+----------+ >> | task | 31 | >> +------+----------+ >> >> In the program for test issues I declare it this way: >> >> @Override >> public void populateDAG(DAG dag, Configuration conf) >> { >> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new >> RabbitMQInputOperator()); >> in.setHost("localhost"); >> in.setExchange("apex"); >> in.setExchangeType("fanout"); >> in.setQueueName("task"); >> ConsoleOutputOperator console = dag.addOperator("console", new >> ConsoleOutputOperator()); >> dag.addStream("rand_console",in.outputPort, console.input); >> } >> >> But a look at the operators shows that it does not fetch any messages: >> >> { >> "id": "1", >> "name": "rabbitInput", >> "className": "com.datatorrent.contrib.rabbi >> tmq.RabbitMQInputOperator", >> "container": null, >> "host": null, >> "totalTuplesProcessed": "0", >> "totalTuplesEmitted": "0", >> "tuplesProcessedPSMA": "0", >> "tuplesEmittedPSMA": "0", >> "cpuPercentageMA": "0.0", >> "latencyMA": "0", >> "status": "PENDING_DEPLOY", >> "lastHeartbeat": "0", >> "failureCount": "0", >> "recoveryWindowId": "0", >> "currentWindowId": "0", >> "ports": [], >> "unifierClass": null, >> "logicalName": "rabbitInput", >> "recordingId": null, >> "counters": null, >> "metrics": null, >> "checkpointStartTime": "0", >> "checkpointTime": "0", >> "checkpointTimeMA": "0" >> }, >> >> What am I doing wrong here? Since i can configure the RAbbitMQ side is >> there a preferred way of configuration for apex? >> >> Cheers >> >> Manfred. >> >> >> > >
