Hi,

Just answered on Github:
https://github.com/Stratio/rabbitmq-receiver/issues/20

Regards

Daniel Carroza Santana

Vía de las Dos Castillas, 33, Ática 4, 3ª Planta.
28224 Pozuelo de Alarcón. Madrid.
Tel: +34 91 828 64 73 // *@stratiobd <https://twitter.com/StratioBD>*

2015-11-19 10:02 GMT+01:00 D <subharaj.ma...@gmail.com>:

> I am trying to write a simple "Hello World" kind of application using
> spark streaming and RabbitMq, in which Apache Spark Streaming will read
> message from RabbitMq via the RabbitMqReceiver
> <https://github.com/Stratio/rabbitmq-receiver> and print it in the
> console. But some how I am not able to print the string read from Rabbit Mq
> into console. The spark streaming code is printing the message below:-
>
> Value Received BlockRDD[1] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
> Value Received BlockRDD[2] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
>
>
> The message is sent to the rabbitmq via the simple code below:-
>
> package helloWorld;
>
> import com.rabbitmq.client.Channel;
> import com.rabbitmq.client.Connection;
> import com.rabbitmq.client.ConnectionFactory;
>
> public class Send {
>
>   private final static String QUEUE_NAME = "hello1";
>
>   public static void main(String[] argv) throws Exception {
>     ConnectionFactory factory = new ConnectionFactory();
>     factory.setHost("localhost");
>     Connection connection = factory.newConnection();
>     Channel channel = connection.createChannel();
>
>     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
>     String message = "Hello World! is a code. Hi Hello World!";
>     channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
>     System.out.println(" [x] Sent '" + message + "'");
>
>     channel.close();
>     connection.close();
>   }
> }
>
>
> I am trying to read messages via Apache Streaming as shown below:-
>
> package rabbitmq.example;
>
> import java.util.*;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> import com.stratio.receiver.RabbitMQUtils;
>
> public class RabbitMqEx {
>
>     public static void main(String[] args) {
>         System.out.println("Creating    Spark   Configuration");
>         SparkConf conf = new SparkConf();
>         conf.setAppName("RabbitMq Receiver Example");
>         conf.setMaster("local[2]");
>
>         System.out.println("Retreiving  Streaming   Context from    Spark   
> Conf");
>         JavaStreamingContext streamCtx = new JavaStreamingContext(conf,
>                 Durations.seconds(2));
>
>         Map<String, String>rabbitMqConParams = new HashMap<String, String>();
>         rabbitMqConParams.put("host", "localhost");
>         rabbitMqConParams.put("queueName", "hello1");
>         System.out.println("Trying to connect to RabbitMq");
>         JavaReceiverInputDStream<String> receiverStream = 
> RabbitMQUtils.createJavaStream(streamCtx, rabbitMqConParams);
>         receiverStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
>             @Override
>             public Void call(JavaRDD<String> arg0) throws Exception {
>                 System.out.println("Value Received " + arg0.toString());
>                 return null;
>             }
>         } );
>         streamCtx.start();
>         streamCtx.awaitTermination();
>     }
> }
>
> The output console only has message like the following:-
>
> Creating    Spark   Configuration
> Retreiving  Streaming   Context from    Spark   Conf
> Trying to connect to RabbitMq
> Value Received BlockRDD[1] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
> Value Received BlockRDD[2] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
>
>
> In the logs I see the following:-
>
> 15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2
> 15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a 
> loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlan0)
> 15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
> another address
> 15/11/18 13:20:45 INFO SecurityManager: Changing view acls to: jabong
> 15/11/18 13:20:45 INFO SecurityManager: Changing modify acls to: jabong
> 15/11/18 13:20:45 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(jabong); users 
> with modify permissions: Set(jabong)
> 15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started
> 15/11/18 13:20:46 INFO Remoting: Starting remoting
> 15/11/18 13:20:46 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkDriver@192.168.1.3:42978]
> 15/11/18 13:20:46 INFO Utils: Successfully started service 'sparkDriver' on 
> port 42978.
> 15/11/18 13:20:46 INFO SparkEnv: Registering MapOutputTracker
> 15/11/18 13:20:46 INFO SparkEnv: Registering BlockManagerMaster
> 15/11/18 13:20:46 INFO DiskBlockManager: Created local directory at 
> /tmp/blockmgr-9309b35f-a506-49dc-91ab-5c340cd3bdd1
> 15/11/18 13:20:46 INFO MemoryStore: MemoryStore started with capacity 947.7 MB
> 15/11/18 13:20:46 INFO HttpFileServer: HTTP File server directory is 
> /tmp/spark-736f4b9c-764c-4b85-9b37-1cece102c95a/httpd-29196fa0-eb3f-4b7d-97ad-35c5325b09e5
> 15/11/18 13:20:46 INFO HttpServer: Starting HTTP Server
> 15/11/18 13:20:46 INFO Utils: Successfully started service 'HTTP file server' 
> on port 37150.
> 15/11/18 13:20:46 INFO SparkEnv: Registering OutputCommitCoordinator
> 15/11/18 13:20:52 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 15/11/18 13:20:52 INFO SparkUI: Started SparkUI at http://192.168.1.3:4040
> 15/11/18 13:20:52 WARN MetricsSystem: Using default name DAGScheduler for 
> source because spark.app.id is not set.
> 15/11/18 13:20:52 INFO Executor: Starting executor ID driver on host localhost
> 15/11/18 13:20:52 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 47306.
> 15/11/18 13:20:52 INFO NettyBlockTransferService: Server created on 47306
> 15/11/18 13:20:52 INFO BlockManagerMaster: Trying to register BlockManager
> 15/11/18 13:20:52 INFO BlockManagerMasterEndpoint: Registering block manager 
> localhost:47306 with 947.7 MB RAM, BlockManagerId(driver, localhost, 47306)
> 15/11/18 13:20:52 INFO BlockManagerMaster: Registered BlockManager
> Trying to connect to RabbitMq
> 15/11/18 13:20:53 INFO ReceiverTracker: Starting 1 receivers
> 15/11/18 13:20:53 INFO ReceiverTracker: ReceiverTracker started
> 15/11/18 13:20:53 INFO ForEachDStream: metadataCleanupDelay = -1
> 15/11/18 13:20:53 INFO RabbitMQInputDStream: metadataCleanupDelay = -1
> 15/11/18 13:20:53 INFO RabbitMQInputDStream: Slide time = 2000 ms
> 15/11/18 13:20:53 INFO RabbitMQInputDStream: Storage level = 
> StorageLevel(false, false, false, false, 1)
> 15/11/18 13:20:53 INFO RabbitMQInputDStream: Checkpoint interval = null
> 15/11/18 13:20:53 INFO RabbitMQInputDStream: Remember duration = 2000 ms
> 15/11/18 13:20:53 INFO RabbitMQInputDStream: Initialized and validated 
> com.stratio.receiver.RabbitMQInputDStream@5d00adc2
> 15/11/18 13:20:53 INFO ForEachDStream: Slide time = 2000 ms
> 15/11/18 13:20:53 INFO ForEachDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/11/18 13:20:53 INFO ForEachDStream: Checkpoint interval = null
> 15/11/18 13:20:53 INFO ForEachDStream: Remember duration = 2000 ms
> 15/11/18 13:20:53 INFO ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@4c132773
> 15/11/18 13:20:53 INFO RecurringTimer: Started timer for JobGenerator at time 
> 1447833054000
> 15/11/18 13:20:53 INFO JobGenerator: Started JobGenerator at 1447833054000 ms
> 15/11/18 13:20:53 INFO JobScheduler: Started JobScheduler
> 15/11/18 13:20:53 INFO StreamingContext: StreamingContext started
> 15/11/18 13:20:53 INFO DAGScheduler: Got job 0 (start at RabbitMqEx.java:38) 
> with 1 output partitions
> 15/11/18 13:20:53 INFO DAGScheduler: Final stage: ResultStage 0(start at 
> RabbitMqEx.java:38)
> 15/11/18 13:20:53 INFO ReceiverTracker: Receiver 0 started
> 15/11/18 13:20:53 INFO DAGScheduler: Parents of final stage: List()
> 15/11/18 13:20:53 INFO DAGScheduler: Missing parents: List()
> 15/11/18 13:20:53 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 
> ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556), which has 
> no missing parents
> 15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(46496) called with 
> curMem=0, maxMem=993735475
> 15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 45.4 KB, free 947.7 MB)
> 15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(15206) called with 
> curMem=46496, maxMem=993735475
> 15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 14.8 KB, free 947.6 MB)
> 15/11/18 13:20:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:47306 (size: 14.8 KB, free: 947.7 MB)
> 15/11/18 13:20:53 INFO SparkContext: Created broadcast 0 from broadcast at 
> DAGScheduler.scala:861
> 15/11/18 13:20:53 INFO DAGScheduler: Submitting 1 missing tasks from 
> ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at 
> ReceiverTracker.scala:556)
> 15/11/18 13:20:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
> 15/11/18 13:20:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, NODE_LOCAL, 2729 bytes)
> 15/11/18 13:20:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 15/11/18 13:20:53 INFO RecurringTimer: Started timer for BlockGenerator at 
> time 1447833053800
> 15/11/18 13:20:53 INFO BlockGenerator: Started BlockGenerator
> 15/11/18 13:20:53 INFO BlockGenerator: Started block pushing thread
> 15/11/18 13:20:53 INFO ReceiverTracker: Registered receiver for stream 0 from 
> 192.168.1.3:42978
> 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Starting receiver
> 15/11/18 13:20:53 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
> 15/11/18 13:20:53 INFO RabbitMQReceiver: Address localhost
> 15/11/18 13:20:53 INFO RabbitMQReceiver: creating new connection and channel
> 15/11/18 13:20:53 INFO RabbitMQReceiver: No virtual host configured
> 15/11/18 13:20:53 INFO RabbitMQReceiver: created new connection and channel
> 15/11/18 13:20:53 INFO RabbitMQReceiver: onStart, Connecting..
> 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStart
> 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Waiting for receiver to be 
> stopped
> 15/11/18 13:20:53 INFO RabbitMQReceiver: declaring direct queue
> 15/11/18 13:20:53 ERROR RabbitMQReceiver: Got this unknown exception: 
> java.io.IOException
> java.io.IOException
>     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
>     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
>     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
>     at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
>     at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
>     at 
> com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
>     at 
> com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
>     at 
> com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
> protocol method: #method<channel.close>(reply-code=406, 
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 
> 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, 
> method-id=10)
>     at 
> com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
>     at 
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
>     at 
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
>     at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
>     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
>     ... 5 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
> protocol method: #method<channel.close>(reply-code=406, 
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 
> 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, 
> method-id=10)
>     at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
>     at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
>     at 
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
>     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
>     at 
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
>     at java.lang.Thread.run(Thread.java:745)
> 15/11/18 13:20:53 INFO RabbitMQReceiver: it has been stopped
> 15/11/18 13:20:53 ERROR RabbitMQReceiver: error on close channel, ignoring
> 15/11/18 13:20:53 WARN ReceiverSupervisorImpl: Restarting receiver with delay 
> 2000 ms: Trying to connect again
> 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopping receiver with 
> message: Restarting receiver with delay 2000ms: Trying to connect again:
> 15/11/18 13:20:53 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
> 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStop
> 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Deregistering receiver 0
> 15/11/18 13:20:53 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
> Restarting receiver with delay 2000ms: Trying to connect again
> 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopped receiver 0
> 15/11/18 13:20:54 INFO JobScheduler: Added jobs for time 1447833054000 ms
> 15/11/18 13:20:54 INFO JobScheduler: Starting job streaming job 1447833054000 
> ms.0 from job set of time 1447833054000 ms
> Value Received BlockRDD[1] at ReceiverInputDStream at 
> RabbitMQInputDStream.scala:33
> 15/11/18 13:20:54 INFO JobScheduler: Finished job streaming job 1447833054000 
> ms.0 from job set of time 1447833054000 ms
> 15/11/18 13:20:54 INFO JobScheduler: Total delay: 0.031 s for time 
> 1447833054000 ms (execution: 0.007 s)
> 15/11/18 13:20:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
> 15/11/18 13:20:54 INFO InputInfoTracker: remove old batch metadata:
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver again
> 15/11/18 13:20:55 INFO ReceiverTracker: Registered receiver for stream 0 from 
> 192.168.1.3:42978
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver
> 15/11/18 13:20:55 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
> 15/11/18 13:20:55 INFO RabbitMQReceiver: Address localhost
> 15/11/18 13:20:55 INFO RabbitMQReceiver: creating new connection and channel
> 15/11/18 13:20:55 INFO RabbitMQReceiver: No virtual host configured
> 15/11/18 13:20:55 INFO RabbitMQReceiver: created new connection and channel
> 15/11/18 13:20:55 INFO RabbitMQReceiver: onStart, Connecting..
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStart
> 15/11/18 13:20:55 INFO RabbitMQReceiver: declaring direct queue
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Receiver started again
> 15/11/18 13:20:55 ERROR RabbitMQReceiver: Got this unknown exception: 
> java.io.IOException
> java.io.IOException
>     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
>     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
>     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
>     at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
>     at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
>     at 
> com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
>     at 
> com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
>     at 
> com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
> protocol method: #method<channel.close>(reply-code=406, 
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 
> 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, 
> method-id=10)
>     at 
> com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
>     at 
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
>     at 
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
>     at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
>     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
>     ... 5 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
> protocol method: #method<channel.close>(reply-code=406, 
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 
> 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, 
> method-id=10)
>     at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
>     at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
>     at 
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
>     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
>     at 
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
>     at java.lang.Thread.run(Thread.java:745)
> 15/11/18 13:20:55 INFO RabbitMQReceiver: it has been stopped
> 15/11/18 13:20:55 ERROR RabbitMQReceiver: error on close channel, ignoring
> 15/11/18 13:20:55 WARN ReceiverSupervisorImpl: Restarting receiver with delay 
> 2000 ms: Trying to connect again
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopping receiver with 
> message: Restarting receiver with delay 2000ms: Trying to connect again:
> 15/11/18 13:20:55 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStop
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Deregistering receiver 0
> 15/11/18 13:20:55 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
> Restarting receiver with delay 2000ms: Trying to connect again
> 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopped receiver 0
> 15/11/18 13:20:56 INFO JobScheduler: Added jobs for time 1447833056000 ms
> 15/11/18 13:20:56 INFO JobScheduler: Starting job streaming job 1447833056000 
> ms.0 from job set of time 1447833056000 ms
>
>
> Doing list_queues list the following:-
>
> sudo rabbitmqctl list_queues
> Listing queues ...
> hello1  2
>
>
> I also printed the value of arg0.count. It is reporting 0. It seems spark
> streaming is not able to read messages from rabbitmq.
>
> However I can read from the queue using a simple java receiver as
> mentioned here
> <https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java>
> .
>
> Environment
>
>    - RabbitMq Version - 3.5.6
>    - Spark 1.5.2
>    - Java 8 (Update 66)
>
> Can some one let me know what is going wrong and how can I read message
> from RabbitMq via Spark Streaming.
>
> Thanks,
> D
>
>
>
>
>
>
>
>

Reply via email to