Re: Distinct is very slow

2015-04-23 Thread Jeetendra Gangele
Anyone any thought on this?

On 22 April 2015 at 22:49, Jeetendra Gangele gangele...@gmail.com wrote:

 I made 7000 tasks in mapTopair and in distinct also I made same number of
 tasks.
 Still lots of shuffle read and write is happening due to application
 running for much longer time.
 Any idea?

 On 17 April 2015 at 11:55, Akhil Das ak...@sigmoidanalytics.com wrote:

 How many tasks are you seeing in your mapToPair stage? Is it 7000? then i
 suggest you giving a number similar/close to 7000 in your .distinct call,
 what is happening in your case is that, you are repartitioning your data to
 a smaller number (32) which would put a lot of load on processing i
 believe, you can try increasing it.

 Thanks
 Best Regards

 On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Akhil, any thought on this?

 On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com
 wrote:

 No I did not tried the partitioning below is the full code

 public static  void  matchAndMerge(JavaRDDVendorRecord
 matchRdd,JavaSparkContext jsc) throws IOException{
  long start = System.currentTimeMillis();
   JavaPairRDDLong, MatcherReleventData RddForMarch
 =matchRdd.zipWithIndex().mapToPair(new
 PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() {

 @Override
 public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord,
 Long t)
 throws Exception {
 MatcherReleventData matcherData = new MatcherReleventData();
 Tuple2Long, MatcherReleventData tuple = new Tuple2Long,
 MatcherReleventData(t._2,
 matcherData.convertVendorDataToMatcherData(t._1));
  return tuple;
 }

 }).cache();
  log.info(after index+RddForMarch.take(1));
  MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap();
 MapLong, MatcherReleventData matchData = new HashMapLong,
 MatcherReleventData(tmp);
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);

 JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new
 FunctionMatcherReleventData, IterableString(){

 @Override
 public IterableString call(MatcherReleventData v1)
 throws Exception {
 ListString values = new ArrayListString();
 HelperUtilities helper1 = new HelperUtilities();
 MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
 if(matchkeys.get_companyName() !=null){
 values.add(matchkeys.get_companyName());
 }
 if(matchkeys.get_phoneNumberr() !=null){
 values.add(matchkeys.get_phoneNumberr());
 }
 if(matchkeys.get_zipCode() !=null){
 values.add(matchkeys.get_zipCode());
 }
 if(matchkeys.getM_domain() !=null){
 values.add(matchkeys.getM_domain());
 }
   return values;
 }
  });
  log.info(blocking RDD is+blockingRdd.count());
 int count=0;
 log.info(Starting printing);
   for (Tuple2Long, String entry : blockingRdd.collect()) {

   log.info(entry._1() + : + entry._2());
   count++;
 }
   log.info(total count+count);
  JavaPairRDDLong,Integer
 completeDataToprocess=blockingRdd.flatMapValues( new FunctionString,
 IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);
  log.info(after hbase count is+completeDataToprocess.count());
  log.info(data for process+completeDataToprocess.take(1));
  JavaPairRDDLong, Tuple2Integer, Double withScore
 =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer,
 Long, Tuple2Integer, Double(){

 @Override
 public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer
 t)
 throws Exception {
 Scoring scoreObj = new Scoring();
 double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
 dataMatchGlobal.getValue().get(t._1()));
 Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(),
 score);
 Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long,
 Tuple2Integer,Double(t._1(), maptuple);
 return tuple;
 }
  });
  log.info(with score tuple is+withScore.take(1));
  JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD
 =withScore.reduceByKey( new Function2Tuple2Integer,Double,
 Tuple2Integer,Double, Tuple2Integer,Double(){

 @Override
 public Tuple2Integer, Double call(Tuple2Integer, Double v1,
 Tuple2Integer, Double v2) throws Exception {
  int res =v1._2().compareTo(v2._2());
 if(res 0){
  Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(),
 v1._2());
 return result;
  }
 else if(res0){
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
 else{
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
   }
  });
  log.info(max score RDD+maxScoreRDD.take(10));

  maxScoreRDD.foreach( new
 VoidFunctionTuple2Long,Tuple2Integer,Double(){

 @Override
 public void call(Tuple2Long, Tuple2Integer, Double t)
 throws Exception {
 MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
 log.info(broadcast is+dataMatchGlobal.getValue().get(t._1()));
 //Set the score for better understanding of merge
 matchedData.setScore(t._2()._2());
 

Re: Distinct is very slow

2015-04-17 Thread Akhil Das
How many tasks are you seeing in your mapToPair stage? Is it 7000? then i
suggest you giving a number similar/close to 7000 in your .distinct call,
what is happening in your case is that, you are repartitioning your data to
a smaller number (32) which would put a lot of load on processing i
believe, you can try increasing it.

Thanks
Best Regards

On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Akhil, any thought on this?

 On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote:

 No I did not tried the partitioning below is the full code

 public static  void  matchAndMerge(JavaRDDVendorRecord
 matchRdd,JavaSparkContext jsc) throws IOException{
  long start = System.currentTimeMillis();
   JavaPairRDDLong, MatcherReleventData RddForMarch
 =matchRdd.zipWithIndex().mapToPair(new
 PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() {

 @Override
 public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long
 t)
 throws Exception {
 MatcherReleventData matcherData = new MatcherReleventData();
 Tuple2Long, MatcherReleventData tuple = new Tuple2Long,
 MatcherReleventData(t._2,
 matcherData.convertVendorDataToMatcherData(t._1));
  return tuple;
 }

 }).cache();
  log.info(after index+RddForMarch.take(1));
  MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap();
 MapLong, MatcherReleventData matchData = new HashMapLong,
 MatcherReleventData(tmp);
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);

 JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new
 FunctionMatcherReleventData, IterableString(){

 @Override
 public IterableString call(MatcherReleventData v1)
 throws Exception {
 ListString values = new ArrayListString();
 HelperUtilities helper1 = new HelperUtilities();
 MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
 if(matchkeys.get_companyName() !=null){
 values.add(matchkeys.get_companyName());
 }
 if(matchkeys.get_phoneNumberr() !=null){
 values.add(matchkeys.get_phoneNumberr());
 }
 if(matchkeys.get_zipCode() !=null){
 values.add(matchkeys.get_zipCode());
 }
 if(matchkeys.getM_domain() !=null){
 values.add(matchkeys.getM_domain());
 }
   return values;
 }
  });
  log.info(blocking RDD is+blockingRdd.count());
 int count=0;
 log.info(Starting printing);
   for (Tuple2Long, String entry : blockingRdd.collect()) {

   log.info(entry._1() + : + entry._2());
   count++;
 }
   log.info(total count+count);
  JavaPairRDDLong,Integer
 completeDataToprocess=blockingRdd.flatMapValues( new FunctionString,
 IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);
  log.info(after hbase count is+completeDataToprocess.count());
  log.info(data for process+completeDataToprocess.take(1));
  JavaPairRDDLong, Tuple2Integer, Double withScore
 =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer,
 Long, Tuple2Integer, Double(){

 @Override
 public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t)
 throws Exception {
 Scoring scoreObj = new Scoring();
 double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
 dataMatchGlobal.getValue().get(t._1()));
 Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(),
 score);
 Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long,
 Tuple2Integer,Double(t._1(), maptuple);
 return tuple;
 }
  });
  log.info(with score tuple is+withScore.take(1));
  JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD
 =withScore.reduceByKey( new Function2Tuple2Integer,Double,
 Tuple2Integer,Double, Tuple2Integer,Double(){

 @Override
 public Tuple2Integer, Double call(Tuple2Integer, Double v1,
 Tuple2Integer, Double v2) throws Exception {
  int res =v1._2().compareTo(v2._2());
 if(res 0){
  Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(),
 v1._2());
 return result;
  }
 else if(res0){
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
 else{
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
   }
  });
  log.info(max score RDD+maxScoreRDD.take(10));

  maxScoreRDD.foreach( new
 VoidFunctionTuple2Long,Tuple2Integer,Double(){

 @Override
 public void call(Tuple2Long, Tuple2Integer, Double t)
 throws Exception {
 MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
 log.info(broadcast is+dataMatchGlobal.getValue().get(t._1()));
 //Set the score for better understanding of merge
 matchedData.setScore(t._2()._2());
 vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id);
  }
  });
  log.info(took  + (System.currentTimeMillis() - start) +  mills to
 run matcher);



  }


 On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you paste your complete code? Did you try repartioning/increasing
 level of parallelism to speed up the processing. Since you have 16 cores,
 and I'm 

Re: Distinct is very slow

2015-04-17 Thread Jeetendra Gangele
I am saying to partition something like partitionBy(new HashPartitioner(16)
will this not work?

On 17 April 2015 at 21:28, Jeetendra Gangele gangele...@gmail.com wrote:

 I have given 3000 task to mapToPair now its taking so much memory and
 shuffling and wasting time there. Here is the stats when I run with very
 small data almost for all data its doing shuffling not sure what is
 happening here any idea?


- *Total task time across all tasks: *11.0 h
- *Shuffle read: *153.8 MB
- *Shuffle write: *288.0 MB


 On 17 April 2015 at 14:32, Jeetendra Gangele gangele...@gmail.com wrote:

 mapToPair is running with 32 tasks but very slow because lot of shuffles
 read. attaching screen shot
 each task is running from 10 mins. even Though Inside function i m not
 doing anything costly.








Re: Distinct is very slow

2015-04-16 Thread Akhil Das
Can you paste your complete code? Did you try repartioning/increasing level
of parallelism to speed up the processing. Since you have 16 cores, and I'm
assuming your 400k records isn't bigger than a 10G dataset.

Thanks
Best Regards

On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 I already checked and G is taking 1 secs for each task. is this too much?
 if yes how to avoid this?


 On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote:

 Open the driver ui and see which stage is taking time, you can look
 whether its adding any GC time etc.

 Thanks
 Best Regards

 On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Hi All I have below code whether distinct is running for more time.

 blockingRdd is the combination of Long,String and it will have 400K
 records
 JavaPairRDDLong,Integer
 completeDataToprocess=blockingRdd.flatMapValues( new FunctionString,
 IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);

 I am running distinct on 800K records and its taking 2 hours on 16 cores
 and 20 GB RAM.









Distinct is very slow

2015-04-16 Thread Jeetendra Gangele
Hi All I have below code whether distinct is running for more time.

blockingRdd is the combination of Long,String and it will have 400K
records
JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues(
new FunctionString, IterableInteger(){

@Override
public IterableInteger call(String v1) throws Exception {
return ckdao.getSingelkeyresult(v1);
}
 }).distinct(32);

I am running distinct on 800K records and its taking 2 hours on 16 cores
and 20 GB RAM.


Re: Distinct is very slow

2015-04-16 Thread Akhil Das
Open the driver ui and see which stage is taking time, you can look whether
its adding any GC time etc.

Thanks
Best Regards

On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Hi All I have below code whether distinct is running for more time.

 blockingRdd is the combination of Long,String and it will have 400K
 records
 JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues(
 new FunctionString, IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);

 I am running distinct on 800K records and its taking 2 hours on 16 cores
 and 20 GB RAM.



Re: Distinct is very slow

2015-04-16 Thread Jeetendra Gangele
I already checked and G is taking 1 secs for each task. is this too much?
if yes how to avoid this?

On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote:

 Open the driver ui and see which stage is taking time, you can look
 whether its adding any GC time etc.

 Thanks
 Best Regards

 On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Hi All I have below code whether distinct is running for more time.

 blockingRdd is the combination of Long,String and it will have 400K
 records
 JavaPairRDDLong,Integer
 completeDataToprocess=blockingRdd.flatMapValues( new FunctionString,
 IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);

 I am running distinct on 800K records and its taking 2 hours on 16 cores
 and 20 GB RAM.





Re: Distinct is very slow

2015-04-16 Thread Jeetendra Gangele
No I did not tried the partitioning below is the full code

public static  void  matchAndMerge(JavaRDDVendorRecord
matchRdd,JavaSparkContext jsc) throws IOException{
 long start = System.currentTimeMillis();
  JavaPairRDDLong, MatcherReleventData RddForMarch
=matchRdd.zipWithIndex().mapToPair(new
PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() {

@Override
public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t)
throws Exception {
MatcherReleventData matcherData = new MatcherReleventData();
Tuple2Long, MatcherReleventData tuple = new Tuple2Long,
MatcherReleventData(t._2,
matcherData.convertVendorDataToMatcherData(t._1));
 return tuple;
}

}).cache();
 log.info(after index+RddForMarch.take(1));
 MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap();
MapLong, MatcherReleventData matchData = new HashMapLong,
MatcherReleventData(tmp);
final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
jsc.broadcast(matchData);

JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new
FunctionMatcherReleventData, IterableString(){

@Override
public IterableString call(MatcherReleventData v1)
throws Exception {
ListString values = new ArrayListString();
HelperUtilities helper1 = new HelperUtilities();
MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
if(matchkeys.get_companyName() !=null){
values.add(matchkeys.get_companyName());
}
if(matchkeys.get_phoneNumberr() !=null){
values.add(matchkeys.get_phoneNumberr());
}
if(matchkeys.get_zipCode() !=null){
values.add(matchkeys.get_zipCode());
}
if(matchkeys.getM_domain() !=null){
values.add(matchkeys.getM_domain());
}
  return values;
}
 });
 log.info(blocking RDD is+blockingRdd.count());
int count=0;
log.info(Starting printing);
  for (Tuple2Long, String entry : blockingRdd.collect()) {

  log.info(entry._1() + : + entry._2());
  count++;
}
  log.info(total count+count);
 JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues(
new FunctionString, IterableInteger(){

@Override
public IterableInteger call(String v1) throws Exception {
return ckdao.getSingelkeyresult(v1);
}
 }).distinct(32);
 log.info(after hbase count is+completeDataToprocess.count());
 log.info(data for process+completeDataToprocess.take(1));
 JavaPairRDDLong, Tuple2Integer, Double withScore
=completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer,
Long, Tuple2Integer, Double(){

@Override
public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t)
throws Exception {
Scoring scoreObj = new Scoring();
double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
dataMatchGlobal.getValue().get(t._1()));
Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(),
score);
Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long,
Tuple2Integer,Double(t._1(), maptuple);
return tuple;
}
 });
 log.info(with score tuple is+withScore.take(1));
 JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD
=withScore.reduceByKey( new Function2Tuple2Integer,Double,
Tuple2Integer,Double, Tuple2Integer,Double(){

@Override
public Tuple2Integer, Double call(Tuple2Integer, Double v1,
Tuple2Integer, Double v2) throws Exception {
 int res =v1._2().compareTo(v2._2());
if(res 0){
 Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(),
v1._2());
return result;
 }
else if(res0){
Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
v2._2());
return result;
}
else{
Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
v2._2());
return result;
}
  }
 });
 log.info(max score RDD+maxScoreRDD.take(10));

 maxScoreRDD.foreach( new
VoidFunctionTuple2Long,Tuple2Integer,Double(){

@Override
public void call(Tuple2Long, Tuple2Integer, Double t)
throws Exception {
MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
log.info(broadcast is+dataMatchGlobal.getValue().get(t._1()));
//Set the score for better understanding of merge
matchedData.setScore(t._2()._2());
vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id);
 }
 });
 log.info(took  + (System.currentTimeMillis() - start) +  mills to run
matcher);



 }


On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you paste your complete code? Did you try repartioning/increasing
 level of parallelism to speed up the processing. Since you have 16 cores,
 and I'm assuming your 400k records isn't bigger than a 10G dataset.

 Thanks
 Best Regards

 On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 I already checked and G is taking 1 secs for each task. is this too much?
 if yes how to avoid this?


 On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote:

 Open the driver ui and see which stage is taking time, you can look
 whether its adding any GC time etc.

 Thanks
 Best Regards

 On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Hi All I have below code whether distinct is running for more time.

 blockingRdd is the combination 

Re: Distinct is very slow

2015-04-16 Thread Jeetendra Gangele
Akhil, any thought on this?

On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote:

 No I did not tried the partitioning below is the full code

 public static  void  matchAndMerge(JavaRDDVendorRecord
 matchRdd,JavaSparkContext jsc) throws IOException{
  long start = System.currentTimeMillis();
   JavaPairRDDLong, MatcherReleventData RddForMarch
 =matchRdd.zipWithIndex().mapToPair(new
 PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() {

 @Override
 public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t)
 throws Exception {
 MatcherReleventData matcherData = new MatcherReleventData();
 Tuple2Long, MatcherReleventData tuple = new Tuple2Long,
 MatcherReleventData(t._2,
 matcherData.convertVendorDataToMatcherData(t._1));
  return tuple;
 }

 }).cache();
  log.info(after index+RddForMarch.take(1));
  MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap();
 MapLong, MatcherReleventData matchData = new HashMapLong,
 MatcherReleventData(tmp);
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);

 JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new
 FunctionMatcherReleventData, IterableString(){

 @Override
 public IterableString call(MatcherReleventData v1)
 throws Exception {
 ListString values = new ArrayListString();
 HelperUtilities helper1 = new HelperUtilities();
 MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
 if(matchkeys.get_companyName() !=null){
 values.add(matchkeys.get_companyName());
 }
 if(matchkeys.get_phoneNumberr() !=null){
 values.add(matchkeys.get_phoneNumberr());
 }
 if(matchkeys.get_zipCode() !=null){
 values.add(matchkeys.get_zipCode());
 }
 if(matchkeys.getM_domain() !=null){
 values.add(matchkeys.getM_domain());
 }
   return values;
 }
  });
  log.info(blocking RDD is+blockingRdd.count());
 int count=0;
 log.info(Starting printing);
   for (Tuple2Long, String entry : blockingRdd.collect()) {

   log.info(entry._1() + : + entry._2());
   count++;
 }
   log.info(total count+count);
  JavaPairRDDLong,Integer
 completeDataToprocess=blockingRdd.flatMapValues( new FunctionString,
 IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);
  log.info(after hbase count is+completeDataToprocess.count());
  log.info(data for process+completeDataToprocess.take(1));
  JavaPairRDDLong, Tuple2Integer, Double withScore
 =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer,
 Long, Tuple2Integer, Double(){

 @Override
 public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t)
 throws Exception {
 Scoring scoreObj = new Scoring();
 double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
 dataMatchGlobal.getValue().get(t._1()));
 Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(),
 score);
 Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long,
 Tuple2Integer,Double(t._1(), maptuple);
 return tuple;
 }
  });
  log.info(with score tuple is+withScore.take(1));
  JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD
 =withScore.reduceByKey( new Function2Tuple2Integer,Double,
 Tuple2Integer,Double, Tuple2Integer,Double(){

 @Override
 public Tuple2Integer, Double call(Tuple2Integer, Double v1,
 Tuple2Integer, Double v2) throws Exception {
  int res =v1._2().compareTo(v2._2());
 if(res 0){
  Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(),
 v1._2());
 return result;
  }
 else if(res0){
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
 else{
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
   }
  });
  log.info(max score RDD+maxScoreRDD.take(10));

  maxScoreRDD.foreach( new
 VoidFunctionTuple2Long,Tuple2Integer,Double(){

 @Override
 public void call(Tuple2Long, Tuple2Integer, Double t)
 throws Exception {
 MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
 log.info(broadcast is+dataMatchGlobal.getValue().get(t._1()));
 //Set the score for better understanding of merge
 matchedData.setScore(t._2()._2());
 vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id);
  }
  });
  log.info(took  + (System.currentTimeMillis() - start) +  mills to run
 matcher);



  }


 On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you paste your complete code? Did you try repartioning/increasing
 level of parallelism to speed up the processing. Since you have 16 cores,
 and I'm assuming your 400k records isn't bigger than a 10G dataset.

 Thanks
 Best Regards

 On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 I already checked and G is taking 1 secs for each task. is this too
 much? if yes how to avoid this?


 On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote:

 Open the driver ui and see which stage is taking time, you can look
 whether its adding 

Re: Distinct is very slow

2015-04-16 Thread Jeetendra Gangele
at distinct level I will have 7000 times more elements in my RDD.So should
I re partition? because its parent will definitely have less partition how
to see through java code number of partition?


On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote:

 No I did not tried the partitioning below is the full code

 public static  void  matchAndMerge(JavaRDDVendorRecord
 matchRdd,JavaSparkContext jsc) throws IOException{
  long start = System.currentTimeMillis();
   JavaPairRDDLong, MatcherReleventData RddForMarch
 =matchRdd.zipWithIndex().mapToPair(new
 PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() {

 @Override
 public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t)
 throws Exception {
 MatcherReleventData matcherData = new MatcherReleventData();
 Tuple2Long, MatcherReleventData tuple = new Tuple2Long,
 MatcherReleventData(t._2,
 matcherData.convertVendorDataToMatcherData(t._1));
  return tuple;
 }

 }).cache();
  log.info(after index+RddForMarch.take(1));
  MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap();
 MapLong, MatcherReleventData matchData = new HashMapLong,
 MatcherReleventData(tmp);
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);

 JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new
 FunctionMatcherReleventData, IterableString(){

 @Override
 public IterableString call(MatcherReleventData v1)
 throws Exception {
 ListString values = new ArrayListString();
 HelperUtilities helper1 = new HelperUtilities();
 MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
 if(matchkeys.get_companyName() !=null){
 values.add(matchkeys.get_companyName());
 }
 if(matchkeys.get_phoneNumberr() !=null){
 values.add(matchkeys.get_phoneNumberr());
 }
 if(matchkeys.get_zipCode() !=null){
 values.add(matchkeys.get_zipCode());
 }
 if(matchkeys.getM_domain() !=null){
 values.add(matchkeys.getM_domain());
 }
   return values;
 }
  });
  log.info(blocking RDD is+blockingRdd.count());
 int count=0;
 log.info(Starting printing);
   for (Tuple2Long, String entry : blockingRdd.collect()) {

   log.info(entry._1() + : + entry._2());
   count++;
 }
   log.info(total count+count);
  JavaPairRDDLong,Integer
 completeDataToprocess=blockingRdd.flatMapValues( new FunctionString,
 IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);
  log.info(after hbase count is+completeDataToprocess.count());
  log.info(data for process+completeDataToprocess.take(1));
  JavaPairRDDLong, Tuple2Integer, Double withScore
 =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer,
 Long, Tuple2Integer, Double(){

 @Override
 public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t)
 throws Exception {
 Scoring scoreObj = new Scoring();
 double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
 dataMatchGlobal.getValue().get(t._1()));
 Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(),
 score);
 Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long,
 Tuple2Integer,Double(t._1(), maptuple);
 return tuple;
 }
  });
  log.info(with score tuple is+withScore.take(1));
  JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD
 =withScore.reduceByKey( new Function2Tuple2Integer,Double,
 Tuple2Integer,Double, Tuple2Integer,Double(){

 @Override
 public Tuple2Integer, Double call(Tuple2Integer, Double v1,
 Tuple2Integer, Double v2) throws Exception {
  int res =v1._2().compareTo(v2._2());
 if(res 0){
  Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(),
 v1._2());
 return result;
  }
 else if(res0){
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
 else{
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
   }
  });
  log.info(max score RDD+maxScoreRDD.take(10));

  maxScoreRDD.foreach( new
 VoidFunctionTuple2Long,Tuple2Integer,Double(){

 @Override
 public void call(Tuple2Long, Tuple2Integer, Double t)
 throws Exception {
 MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
 log.info(broadcast is+dataMatchGlobal.getValue().get(t._1()));
 //Set the score for better understanding of merge
 matchedData.setScore(t._2()._2());
 vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id);
  }
  });
  log.info(took  + (System.currentTimeMillis() - start) +  mills to run
 matcher);



  }


 On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you paste your complete code? Did you try repartioning/increasing
 level of parallelism to speed up the processing. Since you have 16 cores,
 and I'm assuming your 400k records isn't bigger than a 10G dataset.

 Thanks
 Best Regards

 On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 I already checked and G is taking 1 secs for each task. is this too
 much? if yes how to avoid