Hi Team,

I am new to spark Streaming , I am trying to write a spark Streaming 
application , where the Calculation of incoming data will be performed in "R" 
in the micro batching .

But I want to make wordCounts.mapToPair parallel where wordCounts is the output 
of groupByKey, How can I ensure that, wordCounts.mapToPair will be all parallel 
, so that RUtilMethods.sum(inputToR)) will be invoked parallel.
How to ensure the above parallelism ?????
Note: I can not use reduceByKey or combineByKey as calling R multiple time 
would be significant overhead .
Thanks!!!!

////////////////Code Sample////////////////////
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) {


    // Create the context with a 1 second batch size
    SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
    sparkConf.setMaster("local[4]");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(10));
    // Create a JavaReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 
10000,
            StorageLevels.MEMORY_AND_DISK_SER);

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
        @Override
        public Iterable<String> call(String x) {
            return Lists.newArrayList(SPACE.split(x));
        }
    });
    JavaPairDStream<String, Iterable<Integer>> wordCounts = words.mapToPair(new 
PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
        }
    }).groupByKeyAndWindow(Durations.seconds(60));
    JavaPairDStream<String, Integer> wordCounts1=wordCounts.mapToPair(new 
PairFunction<Tuple2<String,Iterable<Integer>>, String, Integer>() {

        @Override
        public Tuple2<String, Integer> call(Tuple2<String, Iterable<Integer>> 
data) throws Exception {
            // TODO Auto-generated method stub
            List<Integer> it=IteratorUtils.toList(data._2.iterator());
            int[] inputToR = ArrayUtils.toPrimitive(it.toArray(new Integer[0]));
            it = null;
            Runtime.getRuntime().gc();
            return new Tuple2<String, Integer>(data._1, 
RUtilMethods.sum(inputToR));
        }
    });

    wordCounts1.print();
    ssc.start();
    ssc.awaitTermination();
}}
///////////////////////////////////////////////////////////

Reply via email to