Re: GroupByKey causing problem

2015-02-26 Thread Imran Rashid
Hi Tushar,

The most scalable option is probably for you to consider doing some
approximation.  Eg., sample the first to come up with the bucket
boundaries.  Then you can assign data points to buckets without needing to
do a full groupByKey.  You could even have more passes which corrects any
errors in your approximation (eg., see how sortByKey() works, and how it
samples the underlying RDD when constructing the RangePartitioner).  Though
its more passes through the data, it will probably be much faster since you
avoid the expensive groupByKey()

Imran

On Thu, Feb 26, 2015 at 3:38 AM, Tushar Sharma tushars...@gmail.com wrote:

 Hi,

 I am trying to apply binning to a large CSV dataset. Here are the steps I
 am taking:

 1. Emit each value of CSV as (ColIndex,(RowIndex,value))
 2. Then I groupByKey (here ColumnIndex) and get all values of a particular
 index to one node, as I have to work on the collection of all values
 3. I apply my binning algorithm which is as follows:
 a. Sort the values
 b. Iterate through values and see if it is different than the previous
 one
 if no then add it to the same bin
 if yes then check the size of that bin, if it is greater than a
 particular size (say 5% of wholedataset) then change the bin
 number, else keep the same bin
 c. repeat for each column

 Due to this algorithm I can't calculate it partition wise and merge for
 final result. But even for groupByKey I expect it should work , maybe
 slowly, but it should finish. I increased the partition to reduce the
 output of each groupByKey so that it helps in successful completion of the
 process. But even with that it is stuck at the same stage. The log for
 executor says:

 ExternalMapAppendOnly(splilling to disk) (Trying ...)

 The code works for small CSV files but can't complete for big files.

 val inputfile = hdfs://hm41:9000/user/file1
 val table = sc.textFile(inputfile,1000)

 val withoutHeader: RDD[String] = dropHeader(table)

 val kvPairs = withoutHeader.flatMap(retAtrTuple)

 //val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA)  else y)}

 val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__)

 val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case
 (x,y) = x}.collect()
 //val isNum_Arr = isNum.sortByKey().collect()

 val kvidx = withoutHeader.zipWithIndex
 //val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) }


 val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) }
 val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 }

 //val t2 = t.filter{case (a,b) = a._1 ==0 }
 val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))}
 //val t4 = t3.sortBy(_._2._1)
 val t4 = t3.groupByKey.map{case (a,b) =
 (a,classing_summary(b.toArray.sortBy(_._2)))}

 def dropHeader(data: RDD[String]): RDD[String] = {
 data.mapPartitionsWithIndex((idx, lines) = {
   if (idx == 0) {
 lines.drop(1)
   }
   lines
 })
   }


   def retAtrTuple(x: String) = {
 val newX = x.split(',')
 for (h - 0 until newX.length)
   yield (h, newX(h))
   }

 def isNumeric(s: String): Boolean = {
 (allCatch opt s.toDouble).isDefined
   }

 def classing_summary(arr: Array[(Long, Double)]) = {
   var idx = 0L
   var value = 0.0
   var prevValue = Double.MinValue
   var counter = 1
   var classSize = 0.0
   var size = arr.length

   val output = for(i - 0 until arr.length) yield {
   idx = arr(i)._1;
   value = arr(i)._2;
   if(value==prevValue){
 classSize+=1.0/size;
 //println(both values same)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   else if(classSize(0.05)){
 classSize+=1.0/size;
 //println(both values not same, adding to present bucket)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   else {
 classSize = 1.0/size;
 counter +=1;
 //println(both values not same, adding to different bucket)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   }
   output.toArray
 }

 Thanks in advance,

 Tushar Sharma



GroupByKey causing problem

2015-02-26 Thread Tushar Sharma
Hi,

I am trying to apply binning to a large CSV dataset. Here are the steps I
am taking:

1. Emit each value of CSV as (ColIndex,(RowIndex,value))
2. Then I groupByKey (here ColumnIndex) and get all values of a particular
index to one node, as I have to work on the collection of all values
3. I apply my binning algorithm which is as follows:
a. Sort the values
b. Iterate through values and see if it is different than the previous
one
if no then add it to the same bin
if yes then check the size of that bin, if it is greater than a
particular size (say 5% of wholedataset) then change the bin
number, else keep the same bin
c. repeat for each column

Due to this algorithm I can't calculate it partition wise and merge for
final result. But even for groupByKey I expect it should work , maybe
slowly, but it should finish. I increased the partition to reduce the
output of each groupByKey so that it helps in successful completion of the
process. But even with that it is stuck at the same stage. The log for
executor says:

ExternalMapAppendOnly(splilling to disk) (Trying ...)

The code works for small CSV files but can't complete for big files.

val inputfile = hdfs://hm41:9000/user/file1
val table = sc.textFile(inputfile,1000)

val withoutHeader: RDD[String] = dropHeader(table)

val kvPairs = withoutHeader.flatMap(retAtrTuple)

//val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA)  else y)}

val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__)

val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case
(x,y) = x}.collect()
//val isNum_Arr = isNum.sortByKey().collect()

val kvidx = withoutHeader.zipWithIndex
//val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) }


val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) }
val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 }

//val t2 = t.filter{case (a,b) = a._1 ==0 }
val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))}
//val t4 = t3.sortBy(_._2._1)
val t4 = t3.groupByKey.map{case (a,b) =
(a,classing_summary(b.toArray.sortBy(_._2)))}

def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) = {
  if (idx == 0) {
lines.drop(1)
  }
  lines
})
  }


  def retAtrTuple(x: String) = {
val newX = x.split(',')
for (h - 0 until newX.length)
  yield (h, newX(h))
  }

def isNumeric(s: String): Boolean = {
(allCatch opt s.toDouble).isDefined
  }

def classing_summary(arr: Array[(Long, Double)]) = {
  var idx = 0L
  var value = 0.0
  var prevValue = Double.MinValue
  var counter = 1
  var classSize = 0.0
  var size = arr.length

  val output = for(i - 0 until arr.length) yield {
  idx = arr(i)._1;
  value = arr(i)._2;
  if(value==prevValue){
classSize+=1.0/size;
//println(both values same)
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  else if(classSize(0.05)){
classSize+=1.0/size;
//println(both values not same, adding to present bucket)
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  else {
classSize = 1.0/size;
counter +=1;
//println(both values not same, adding to different bucket)
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  }
  output.toArray
}

Thanks in advance,

Tushar Sharma