Thanks for reply. Please find sudo code below. Its Dstreams reading for every 10secs from kinesis stream and after transformations, pushing into hbase. Once got Dstream, we are using below code to repartition and do processing:
dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3); dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords -> { Connection hbaseConnection= ConnectionUtil.getHbaseConnection(); List<byte[]> listOfRecords = new ArrayList<>(); while (partitionOfRecords.hasNext()) { listOfRecords.add(partitionOfRecords.next()); if (listOfRecords.size() < 10 && partitionOfRecords.hasNext()) continue; List<byte[]> finalListOfRecords = listOfRecords; doJob(finalListOfRecords, hbaseConnection); listOfRecords = new ArrayList<>(); } })); We are batching every 10 records and pass to doJob method where we batch process and bulk insert to hbase. With above code, will it be able to tell what is happening at job 1 -> 6 tasks? and how to replace repartition method efficiently. Thanks in Advance -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org