I am trying to write a simple "Hello World" kind of application using
spark streaming and RabbitMq, in which Apache Spark Streaming will read
message from RabbitMq via the RabbitMqReceiver
<https://github.com/Stratio/rabbitmq-receiver> and print it in the
console. But some how I am not able to print the string read from Rabbit
Mq into console. The spark streaming code is printing the message below:-
|Value Received BlockRDD[1] at ReceiverInputDStream at
RabbitMQInputDStream.scala:33 Value Received BlockRDD[2] at
ReceiverInputDStream at RabbitMQInputDStream.scala:33 |
The message is sent to the rabbitmq via the simple code below:-
|package helloWorld; import com.rabbitmq.client.Channel; import
com.rabbitmq.client.Connection; import
com.rabbitmq.client.ConnectionFactory; public class Send { private final
static String QUEUE_NAME = "hello1"; public static void main(String[]
argv) throws Exception { ConnectionFactory factory = new
ConnectionFactory(); factory.setHost("localhost"); Connection connection
= factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String
message = "Hello World! is a code. Hi Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'"); channel.close();
connection.close(); } } |
I am trying to read messages via Apache Streaming as shown below:-
|package rabbitmq.example; import java.util.*; import
org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function; import
org.apache.spark.streaming.Durations; import
org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import
org.apache.spark.streaming.api.java.JavaStreamingContext; import
com.stratio.receiver.RabbitMQUtils; public class RabbitMqEx { public
static void main(String[] args) { System.out.println("Creating Spark
Configuration"); SparkConf conf = new SparkConf();
conf.setAppName("RabbitMq Receiver Example");
conf.setMaster("local[2]"); System.out.println("Retreiving Streaming
Context from Spark Conf"); JavaStreamingContext streamCtx = new
JavaStreamingContext(conf, Durations.seconds(2)); Map<String,
String>rabbitMqConParams = new HashMap<String, String>();
rabbitMqConParams.put("host", "localhost");
rabbitMqConParams.put("queueName", "hello1"); System.out.println("Trying
to connect to RabbitMq"); JavaReceiverInputDStream<String>
receiverStream = RabbitMQUtils.createJavaStream(streamCtx,
rabbitMqConParams); receiverStream.foreachRDD(new
Function<JavaRDD<String>, Void>() { @Override public Void
call(JavaRDD<String> arg0) throws Exception { System.out.println("Value
Received " + arg0.toString()); return null; } } ); streamCtx.start();
streamCtx.awaitTermination(); } } |
The output console only has message like the following:-
|Creating Spark Configuration Retreiving Streaming Context from Spark
Conf Trying to connect to RabbitMq Value Received BlockRDD[1] at
ReceiverInputDStream at RabbitMQInputDStream.scala:33 Value Received
BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33 |
In the logs I see the following:-
|15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2
15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a
loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface
wlan0) 15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to
bind to another address 15/11/18 13:20:45 INFO SecurityManager: Changing
view acls to: jabong 15/11/18 13:20:45 INFO SecurityManager: Changing
modify acls to: jabong 15/11/18 13:20:45 INFO SecurityManager:
SecurityManager: authentication disabled; ui acls disabled; users with
view permissions: Set(jabong); users with modify permissions:
Set(jabong) 15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started
15/11/18 13:20:46 INFO Remoting: Starting remoting 15/11/18 13:20:46
INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@192.168.1.3:42978] 15/11/18 13:20:46 INFO
Utils: Successfully started service 'sparkDriver' on port 42978.
15/11/18 13:20:46 INFO SparkEnv: Registering MapOutputTracker 15/11/18
13:20:46 INFO SparkEnv: Registering BlockManagerMaster 15/11/18 13:20:46
INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-9309b35f-a506-49dc-91ab-5c340cd3bdd1 15/11/18 13:20:46
INFO MemoryStore: MemoryStore started with capacity 947.7 MB 15/11/18
13:20:46 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-736f4b9c-764c-4b85-9b37-1cece102c95a/httpd-29196fa0-eb3f-4b7d-97ad-35c5325b09e5
15/11/18 13:20:46 INFO HttpServer: Starting HTTP Server 15/11/18
13:20:46 INFO Utils: Successfully started service 'HTTP file server' on
port 37150. 15/11/18 13:20:46 INFO SparkEnv: Registering
OutputCommitCoordinator 15/11/18 13:20:52 INFO Utils: Successfully
started service 'SparkUI' on port 4040. 15/11/18 13:20:52 INFO SparkUI:
Started SparkUI at http://192.168.1.3:4040 15/11/18 13:20:52 WARN
MetricsSystem: Using default name DAGScheduler for source because
spark.app.id is not set. 15/11/18 13:20:52 INFO Executor: Starting
executor ID driver on host localhost 15/11/18 13:20:52 INFO Utils:
Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
47306. 15/11/18 13:20:52 INFO NettyBlockTransferService: Server created
on 47306 15/11/18 13:20:52 INFO BlockManagerMaster: Trying to register
BlockManager 15/11/18 13:20:52 INFO BlockManagerMasterEndpoint:
Registering block manager localhost:47306 with 947.7 MB RAM,
BlockManagerId(driver, localhost, 47306) 15/11/18 13:20:52 INFO
BlockManagerMaster: Registered BlockManager Trying to connect to
RabbitMq 15/11/18 13:20:53 INFO ReceiverTracker: Starting 1 receivers
15/11/18 13:20:53 INFO ReceiverTracker: ReceiverTracker started 15/11/18
13:20:53 INFO ForEachDStream: metadataCleanupDelay = -1 15/11/18
13:20:53 INFO RabbitMQInputDStream: metadataCleanupDelay = -1 15/11/18
13:20:53 INFO RabbitMQInputDStream: Slide time = 2000 ms 15/11/18
13:20:53 INFO RabbitMQInputDStream: Storage level = StorageLevel(false,
false, false, false, 1) 15/11/18 13:20:53 INFO RabbitMQInputDStream:
Checkpoint interval = null 15/11/18 13:20:53 INFO RabbitMQInputDStream:
Remember duration = 2000 ms 15/11/18 13:20:53 INFO RabbitMQInputDStream:
Initialized and validated
com.stratio.receiver.RabbitMQInputDStream@5d00adc2 15/11/18 13:20:53
INFO ForEachDStream: Slide time = 2000 ms 15/11/18 13:20:53 INFO
ForEachDStream: Storage level = StorageLevel(false, false, false, false,
1) 15/11/18 13:20:53 INFO ForEachDStream: Checkpoint interval = null
15/11/18 13:20:53 INFO ForEachDStream: Remember duration = 2000 ms
15/11/18 13:20:53 INFO ForEachDStream: Initialized and validated
org.apache.spark.streaming.dstream.ForEachDStream@4c132773 15/11/18
13:20:53 INFO RecurringTimer: Started timer for JobGenerator at time
1447833054000 15/11/18 13:20:53 INFO JobGenerator: Started JobGenerator
at 1447833054000 ms 15/11/18 13:20:53 INFO JobScheduler: Started
JobScheduler 15/11/18 13:20:53 INFO StreamingContext: StreamingContext
started 15/11/18 13:20:53 INFO DAGScheduler: Got job 0 (start at
RabbitMqEx.java:38) with 1 output partitions 15/11/18 13:20:53 INFO
DAGScheduler: Final stage: ResultStage 0(start at RabbitMqEx.java:38)
15/11/18 13:20:53 INFO ReceiverTracker: Receiver 0 started 15/11/18
13:20:53 INFO DAGScheduler: Parents of final stage: List() 15/11/18
13:20:53 INFO DAGScheduler: Missing parents: List() 15/11/18 13:20:53
INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0
ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556), which
has no missing parents 15/11/18 13:20:53 INFO MemoryStore:
ensureFreeSpace(46496) called with curMem=0, maxMem=993735475 15/11/18
13:20:53 INFO MemoryStore: Block broadcast_0 stored as values in memory
(estimated size 45.4 KB, free 947.7 MB) 15/11/18 13:20:53 INFO
MemoryStore: ensureFreeSpace(15206) called with curMem=46496,
maxMem=993735475 15/11/18 13:20:53 INFO MemoryStore: Block
broadcast_0_piece0 stored as bytes in memory (estimated size 14.8 KB,
free 947.6 MB) 15/11/18 13:20:53 INFO BlockManagerInfo: Added
broadcast_0_piece0 in memory on localhost:47306 (size: 14.8 KB, free:
947.7 MB) 15/11/18 13:20:53 INFO SparkContext: Created broadcast 0 from
broadcast at DAGScheduler.scala:861 15/11/18 13:20:53 INFO DAGScheduler:
Submitting 1 missing tasks from ResultStage 0 (Receiver 0
ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556)
15/11/18 13:20:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1
tasks 15/11/18 13:20:53 INFO TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, localhost, NODE_LOCAL, 2729 bytes) 15/11/18 13:20:53 INFO
Executor: Running task 0.0 in stage 0.0 (TID 0) 15/11/18 13:20:53 INFO
RecurringTimer: Started timer for BlockGenerator at time 1447833053800
15/11/18 13:20:53 INFO BlockGenerator: Started BlockGenerator 15/11/18
13:20:53 INFO BlockGenerator: Started block pushing thread 15/11/18
13:20:53 INFO ReceiverTracker: Registered receiver for stream 0 from
192.168.1.3:42978 15/11/18 13:20:53 INFO ReceiverSupervisorImpl:
Starting receiver 15/11/18 13:20:53 INFO RabbitMQReceiver: Rabbit host
addresses are :localhost 15/11/18 13:20:53 INFO RabbitMQReceiver:
Address localhost 15/11/18 13:20:53 INFO RabbitMQReceiver: creating new
connection and channel 15/11/18 13:20:53 INFO RabbitMQReceiver: No
virtual host configured 15/11/18 13:20:53 INFO RabbitMQReceiver: created
new connection and channel 15/11/18 13:20:53 INFO RabbitMQReceiver:
onStart, Connecting.. 15/11/18 13:20:53 INFO ReceiverSupervisorImpl:
Called receiver onStart 15/11/18 13:20:53 INFO ReceiverSupervisorImpl:
Waiting for receiver to be stopped 15/11/18 13:20:53 INFO
RabbitMQReceiver: declaring direct queue 15/11/18 13:20:53 ERROR
RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException at
com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at
com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844) at
com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at
com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
at
com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
at
com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue
'hello1' in vhost '/': received 'true' but current is 'false',
class-id=50, method-id=10) at
com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 5 more Caused by: com.rabbitmq.client.ShutdownSignalException:
channel error; protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue
'hello1' in vhost '/': received 'true' but current is 'false',
class-id=50, method-id=10) at
com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) at
com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
at java.lang.Thread.run(Thread.java:745) 15/11/18 13:20:53 INFO
RabbitMQReceiver: it has been stopped 15/11/18 13:20:53 ERROR
RabbitMQReceiver: error on close channel, ignoring 15/11/18 13:20:53
WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms:
Trying to connect again 15/11/18 13:20:53 INFO ReceiverSupervisorImpl:
Stopping receiver with message: Restarting receiver with delay 2000ms:
Trying to connect again: 15/11/18 13:20:53 INFO RabbitMQReceiver:
onStop, doing nothing.. relaxing... 15/11/18 13:20:53 INFO
ReceiverSupervisorImpl: Called receiver onStop 15/11/18 13:20:53 INFO
ReceiverSupervisorImpl: Deregistering receiver 0 15/11/18 13:20:53 ERROR
ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver
with delay 2000ms: Trying to connect again 15/11/18 13:20:53 INFO
ReceiverSupervisorImpl: Stopped receiver 0 15/11/18 13:20:54 INFO
JobScheduler: Added jobs for time 1447833054000 ms 15/11/18 13:20:54
INFO JobScheduler: Starting job streaming job 1447833054000 ms.0 from
job set of time 1447833054000 ms Value Received BlockRDD[1] at
ReceiverInputDStream at RabbitMQInputDStream.scala:33 15/11/18 13:20:54
INFO JobScheduler: Finished job streaming job 1447833054000 ms.0 from
job set of time 1447833054000 ms 15/11/18 13:20:54 INFO JobScheduler:
Total delay: 0.031 s for time 1447833054000 ms (execution: 0.007 s)
15/11/18 13:20:54 INFO ReceivedBlockTracker: Deleting batches
ArrayBuffer() 15/11/18 13:20:54 INFO InputInfoTracker: remove old batch
metadata: 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting
receiver again 15/11/18 13:20:55 INFO ReceiverTracker: Registered
receiver for stream 0 from 192.168.1.3:42978 15/11/18 13:20:55 INFO
ReceiverSupervisorImpl: Starting receiver 15/11/18 13:20:55 INFO
RabbitMQReceiver: Rabbit host addresses are :localhost 15/11/18 13:20:55
INFO RabbitMQReceiver: Address localhost 15/11/18 13:20:55 INFO
RabbitMQReceiver: creating new connection and channel 15/11/18 13:20:55
INFO RabbitMQReceiver: No virtual host configured 15/11/18 13:20:55 INFO
RabbitMQReceiver: created new connection and channel 15/11/18 13:20:55
INFO RabbitMQReceiver: onStart, Connecting.. 15/11/18 13:20:55 INFO
ReceiverSupervisorImpl: Called receiver onStart 15/11/18 13:20:55 INFO
RabbitMQReceiver: declaring direct queue 15/11/18 13:20:55 INFO
ReceiverSupervisorImpl: Receiver started again 15/11/18 13:20:55 ERROR
RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException at
com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at
com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844) at
com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at
com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
at
com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
at
com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue
'hello1' in vhost '/': received 'true' but current is 'false',
class-id=50, method-id=10) at
com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 5 more Caused by: com.rabbitmq.client.ShutdownSignalException:
channel error; protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue
'hello1' in vhost '/': received 'true' but current is 'false',
class-id=50, method-id=10) at
com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) at
com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
at java.lang.Thread.run(Thread.java:745) 15/11/18 13:20:55 INFO
RabbitMQReceiver: it has been stopped 15/11/18 13:20:55 ERROR
RabbitMQReceiver: error on close channel, ignoring 15/11/18 13:20:55
WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms:
Trying to connect again 15/11/18 13:20:55 INFO ReceiverSupervisorImpl:
Stopping receiver with message: Restarting receiver with delay 2000ms:
Trying to connect again: 15/11/18 13:20:55 INFO RabbitMQReceiver:
onStop, doing nothing.. relaxing... 15/11/18 13:20:55 INFO
ReceiverSupervisorImpl: Called receiver onStop 15/11/18 13:20:55 INFO
ReceiverSupervisorImpl: Deregistering receiver 0 15/11/18 13:20:55 ERROR
ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver
with delay 2000ms: Trying to connect again 15/11/18 13:20:55 INFO
ReceiverSupervisorImpl: Stopped receiver 0 15/11/18 13:20:56 INFO
JobScheduler: Added jobs for time 1447833056000 ms 15/11/18 13:20:56
INFO JobScheduler: Starting job streaming job 1447833056000 ms.0 from
job set of time 1447833056000 ms |
Doing |list_queues| list the following:-
||
|sudo rabbitmqctl list_queues Listing queues ... hello1 2 |
I also printed the value of |arg0.count|. It is reporting 0. It seems
spark streaming is not able to read messages from rabbitmq.
However I can read from the queue using a simple java receiver as
mentioned here
<https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java>.
Environment
* RabbitMq Version - 3.5.6
* Spark 1.5.2
* Java 8 (Update 66)
Can some one let me know what is going wrong and how can I read message
from RabbitMq via Spark Streaming.
Thanks,
D