Hi,
I am new to Spark. I have written custom rabbit mq receiver which calls the
store method of Receiver interface. I can see that the block is being
stored. I am trying to process each rdd in the dstream using the foreach
function, but am unable to figure out why this block is not getting invoked
with a populated rdd even after the data is stored (by the receiver). 

/This code initialises the stream:/

 public void testListeningToStateChannel() throws IOException {

        // Create the context with a 1 second batch size
        SparkConf sparkConf = new
SparkConf().setAppName("JavaCustomReceiver");

        sparkConf.setMaster("local");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(1000));

        JavaDStream<String> customReceiverStream = ssc.receiverStream(new
StateBusListener("abcde"));

        customReceiverStream.foreachRDD(doSomething);
        
        customReceiverStream.print();
        ssc.start();
        System.out.println("started :");
        ssc.awaitTermination();

    }

    public static Function2<JavaRDD&lt;String>, Time, Void> doSomething =
new Function2<JavaRDD&lt;String>, Time, Void>() {
        @Override
        public Void call(JavaRDD<String> v1, Time v2) throws Exception {
            System.out.println(v1.toString());
            if (!v1.collect().isEmpty()) {
                long count = v1.count();
                System.out.println("found :" + count);
            }
            return null;
        }
    };

/This is the receiver :/
public class StateBusListener extends Receiver<String> {
    private String QUEUE_NAME;
    private Channel stateBusChannel;
    private Connection connection = null;


    public StateBusListener(String queue_name) {
        super(StorageLevel.MEMORY_AND_DISK_2());
        QUEUE_NAME = queue_name;
    }

    @Override
    public void onStart() {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");
        try {
            connection = factory.newConnection();
            stateBusChannel = connection.createChannel();
            stateBusChannel.queueDeclare("abcde", false, false, false,
null);
            System.out.println("Started!");
            new Thread() {
                @Override
                public void run() {
                    try {
                        receive();
                    } catch (IOException e) {
                        restart("Could not connect", e);
                    } catch (InterruptedException e) {
                        restart("Error receiving data", e);
                    }
                }
            }.start();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void onStop() {
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void receive() throws IOException, InterruptedException {
        QueueingConsumer consumer = new QueueingConsumer(stateBusChannel);
        stateBusChannel.basicConsume(QUEUE_NAME, true, consumer);
        List<String> messages= Lists.newArrayList();

        while (!isStopped()) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            messages.add(message);
            if(messages.size()>5){
                store(messages.iterator());
                messages.clear();
            }
        }
        restart("Trying to connect again");
    }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-not-getting-generated-tp15439.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to