Hi, I am new to Spark. I have written custom rabbit mq receiver which calls the store method of Receiver interface. I can see that the block is being stored. I am trying to process each rdd in the dstream using the foreach function, but am unable to figure out why this block is not getting invoked with a populated rdd even after the data is stored (by the receiver).
/This code initialises the stream:/ public void testListeningToStateChannel() throws IOException { // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver"); sparkConf.setMaster("local"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); JavaDStream<String> customReceiverStream = ssc.receiverStream(new StateBusListener("abcde")); customReceiverStream.foreachRDD(doSomething); customReceiverStream.print(); ssc.start(); System.out.println("started :"); ssc.awaitTermination(); } public static Function2<JavaRDD<String>, Time, Void> doSomething = new Function2<JavaRDD<String>, Time, Void>() { @Override public Void call(JavaRDD<String> v1, Time v2) throws Exception { System.out.println(v1.toString()); if (!v1.collect().isEmpty()) { long count = v1.count(); System.out.println("found :" + count); } return null; } }; /This is the receiver :/ public class StateBusListener extends Receiver<String> { private String QUEUE_NAME; private Channel stateBusChannel; private Connection connection = null; public StateBusListener(String queue_name) { super(StorageLevel.MEMORY_AND_DISK_2()); QUEUE_NAME = queue_name; } @Override public void onStart() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { connection = factory.newConnection(); stateBusChannel = connection.createChannel(); stateBusChannel.queueDeclare("abcde", false, false, false, null); System.out.println("Started!"); new Thread() { @Override public void run() { try { receive(); } catch (IOException e) { restart("Could not connect", e); } catch (InterruptedException e) { restart("Error receiving data", e); } } }.start(); } catch (IOException e) { e.printStackTrace(); } } @Override public void onStop() { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } private void receive() throws IOException, InterruptedException { QueueingConsumer consumer = new QueueingConsumer(stateBusChannel); stateBusChannel.basicConsume(QUEUE_NAME, true, consumer); List<String> messages= Lists.newArrayList(); while (!isStopped()) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); messages.add(message); if(messages.size()>5){ store(messages.iterator()); messages.clear(); } } restart("Trying to connect again"); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-not-getting-generated-tp15439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org