Thanks for reply. Please find sudo code below. Its Dstreams reading for every
10secs from kinesis stream and after transformations, pushing into hbase.
Once got Dstream, we are using below code to repartition and do processing:

dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
   Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
   List<byte[]> listOfRecords = new ArrayList<>();
   while (partitionOfRecords.hasNext()) {
         listOfRecords.add(partitionOfRecords.next());

         if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
                            continue;
         
         List<byte[]> finalListOfRecords = listOfRecords;
         doJob(finalListOfRecords, hbaseConnection);
         listOfRecords = new ArrayList<>();
   }
}));


We are batching every 10 records and pass to doJob method where we batch
process and bulk insert to hbase.

With above code, will it be able to tell what is happening at job 1 -> 6
tasks? and how to replace repartition method efficiently.

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to