Hi Team, I am new to spark Streaming , I am trying to write a spark Streaming application , where the Calculation of incoming data will be performed in "R" in the micro batching .
But I want to make wordCounts.mapToPair parallel where wordCounts is the output of groupByKey, How can I ensure that, wordCounts.mapToPair will be all parallel , so that RUtilMethods.sum(inputToR)) will be invoked parallel. How to ensure the above parallelism ????? Note: I can not use reduceByKey or combineByKey as calling R multiple time would be significant overhead . Thanks!!!! ////////////////Code Sample//////////////////// public final class JavaNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"); sparkConf.setMaster("local[4]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 10000, StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Iterable<Integer>> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).groupByKeyAndWindow(Durations.seconds(60)); JavaPairDStream<String, Integer> wordCounts1=wordCounts.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Iterable<Integer>> data) throws Exception { // TODO Auto-generated method stub List<Integer> it=IteratorUtils.toList(data._2.iterator()); int[] inputToR = ArrayUtils.toPrimitive(it.toArray(new Integer[0])); it = null; Runtime.getRuntime().gc(); return new Tuple2<String, Integer>(data._1, RUtilMethods.sum(inputToR)); } }); wordCounts1.print(); ssc.start(); ssc.awaitTermination(); }} ///////////////////////////////////////////////////////////