Github user Srinivasan2Nagarajan commented on the issue:

    https://github.com/apache/spark/pull/10803
  
    Hi yuo,
    i have used this class in my code but it not given the output as i 
expected. the value which it produces is not continuous 
    
    val startTimeMillis = System.currentTimeMillis()
    val s1 = System.nanoTime()
    
    import org.apache.spark.sql.types.{ StringType, DoubleType, IntegerType}
    import org.apache.spark.mllib.classification.{LogisticRegressionModel, 
LogisticRegressionWithLBFGS}
    import org.apache.spark.mllib.evaluation.MulticlassMetrics
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.util.MLUtils
    import org.apache.spark.ml.feature.{ Tokenizer, HashingTF, IDF }
    import org.apache.spark.sql
    import org.apache.spark.rdd.RDD
    import org.apache.spark.annotation.Since
    import org.apache.spark.ml.Transformer
    import org.apache.spark.ml.feature.{ VectorAssembler, StandardScaler }
    import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
    import org.apache.spark.ml.util.Identifiable
    import org.apache.spark.sql.{DataFrame, Dataset, Column}
    import org.apache.spark.sql.functions._
    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
    import org.apache.spark.ml.tuning.CrossValidator
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.mllib.linalg.{ Vector, Vectors, SparseVector }
    import scala.util.parsing.combinator._
    import org.apache.spark.ml.feature._
    import sqlContext.implicits._
    import org.apache.spark.ml.feature.Bucketizer
    
    var dataDf = spark.sql("select (.75 * longacc_0to1point5 + 2.25 * 
longacc_1point5to3 + 3.75 * longacc_3to4point5 + 5.25 * longacc_4point5to6 + 
6.75 * longacc_6to7point5 + 8.25 * longacc_7point5to9 + 9.75 * 
longacc_9to10point5 + 11.25 * longacc_10point5to12 + 12.75 * 
longacc_12to13point5 + 14.5 * longacc_13point5)/( longacc_0to1point5 + 
longacc_1point5to3 + longacc_3to4point5 + longacc_4point5to6 + 
longacc_6to7point5 + longacc_7point5to9 + longacc_9to10point5 + 
longacc_10point5to12 + longacc_12to13point5 + longacc_13point5) as Long_Acc, 
((5 * Speed_FLM_0to10 + 15 * Speed_FLM_10to20 + 25 * Speed_FLM_20to30 + 40 * 
Speed_FLM_30to50 + 65 * Speed_FLM_50to80 + 100 * Speed_FLM_80to120+135 * 
Speed_FLM_120to150 + 175 * Speed_FLM_150to200 + 225 * Speed_FLM_200toInf)/( 
Speed_FLM_0to10 + Speed_FLM_10to20 + Speed_FLM_20to30+ Speed_FLM_30to50 + 
Speed_FLM_50to80 + Speed_FLM_80to120 + Speed_FLM_120to150 + Speed_FLM_150to200 
+ Speed_FLM_200toInf)) as Speed_FLM, ((1 * dist_0to2km + 3.5 * dist_2to5k
 m + 7.5 * dist_5to10km + 15 * dist_10to20km + 35 * dist_20to50km + 100 * 
dist_50to150km + 175 * dist_150to500km + 600 * dist_500km)/( dist_0to2km + 
dist_2to5km + dist_5to10km + dist_10to20km + dist_20to50km + dist_50to150km + 
dist_150to500km + dist_500km)) as Dist, ((5 * Pedal_0to10perc + 15 * 
Pedal_10to20perc + 25 * Pedal_20to30perc + 35 * Pedal_30to40perc + 45 *  
Pedal_40to50perc + 55 * Pedal_50to60perc + 65* Pedal_60to70perc + 75 * 
Pedal_70to80perc + 85 *  Pedal_80to90perc + 95 *  Pedal_90perc)/ ( 
Pedal_0to10perc +  Pedal_10to20perc +  Pedal_20to30perc +  Pedal_30to40perc +  
Pedal_40to50perc +  Pedal_50to60perc +  Pedal_60to70perc +  Pedal_70to80perc +  
Pedal_80to90perc +  Pedal_90perc)) as Pedal, ((1 * time_1min + 3 *  
time_1to5min + 7.5 * time_5to10min + 20 * time_10to30min + 45 *  time_30to60min 
+ 90 * time_60to120min + 210 *  time_120to300min+400 *  time_300min)/( 
time_1min +  time_1to5min +  time_5to10min + time_10to30min + time_30to60min +  
time_60to120min +  time_120to300m
 in +  time_300min)) as Time, ((1 * Pauses_less_1min + 
5*Pauses_1_10min+20*Pauses_10_30min + 45 * Pauses_30_60min + 
90*Pauses_60_120min + 210 * Pauses_120_300min + 3750 * Pauses_300_24hr + 5040 * 
Pauses_24hr_1week + 21600 * Pauses_1week_1month+ 
43200*Pauses_greater_30days)/(Pauses_less_1min+ Pauses_1_10min+Pauses_10_30min 
+ Pauses_30_60min + Pauses_60_120min + Pauses_120_300min + Pauses_300_24hr + 
Pauses_24hr_1week + Pauses_1week_1month + Pauses_greater_30days)) as Pauses, 
((750 * Speed_RPM_1000 + 1250 * Speed_RPM_1000to1500+1750 * 
Speed_RPM_1500to2000 + 2250 * Speed_RPM_2000to2500+2750 * 
Speed_RPM_2500to3000+3250 * Speed_RPM_3000to3500 + 4750 * Speed_RPM_3500to4000 
+ 4250 * Speed_RPM_4000to4500 + 4750 * Speed_RPM_4500to5000+5250 * 
Speed_RPM_5000to5500 + 5750 * Speed_RPM_5500to6000 + 6250 * 
Speed_RPM_6000to6500 + 6750 * Speed_RPM_6500to7000 + 7250 * Speed_RPM_7000)/( 
Speed_RPM_1000 + Speed_RPM_1000to1500 + Speed_RPM_1500to2000 + 
Speed_RPM_2000to2500 + Speed_RPM_2500to3000 + Speed_RPM
 _3000to3500 + Speed_RPM_3500to4000 + Speed_RPM_4000to4500 + 
Speed_RPM_4500to5000 + Speed_RPM_5000to5500 + Speed_RPM_5500to6000 + 
Speed_RPM_6000to6500 + Speed_RPM_6500to7000 + Speed_RPM_7000)) as Speed_RPM, 
Speed_ReverseDirection, Speed_FLM_Standstill, Distance_Per_day, Starts_Per_Day, 
Driving_Time_Per_Day, Target from fisher_vin_data")
    
    dataDf = dataDf.na.fill(0)
    
    println("Needed Output 1 ----> " + dataDf.count())
    
    def getMaximumValue(x: Column) = dataDf.agg(max(x)).collect().map(r => 
r.toSeq(0).asInstanceOf[Double]).toList(0)
    def getMinimumValue(x: Column) = dataDf.agg(min(x)).collect().map(r => 
r.toSeq(0).asInstanceOf[Double]).toList(0)
    
      def Normalize(x: Column): Column = {
        val max_x = getMaximumValue(x.cast("double"))
        val min_x = getMinimumValue(x.cast("double"))
        return (x.cast("double") - min_x)/ (max_x - min_x)
      }
    
    dataDf = dataDf.withColumn("Long_Acc", 
Normalize(dataDf.col("Long_Acc"))).withColumn("Speed_FLM", 
Normalize(dataDf.col("Speed_FLM"))).withColumn("Dist", 
Normalize(dataDf.col("Dist"))).withColumn("Pedal", 
Normalize(dataDf.col("Pedal"))).withColumn("Time", 
Normalize(dataDf.col("Time"))).withColumn("Pauses", 
Normalize(dataDf.col("Pauses"))).withColumn("Speed_RPM", 
Normalize(dataDf.col("Speed_RPM"))).withColumn("Speed_ReverseDirection", 
Normalize(dataDf.col("Speed_ReverseDirection"))).withColumn("Speed_FLM_Standstill",
 Normalize(dataDf.col("Speed_FLM_Standstill"))).withColumn("Distance_Per_day", 
Normalize(dataDf.col("Distance_Per_day"))).withColumn("Starts_Per_Day", 
Normalize(dataDf.col("Starts_Per_Day"))).withColumn("Driving_Time_Per_Day", 
Normalize(dataDf.col("Driving_Time_Per_Day"))).withColumn("Target", 
Normalize(dataDf.col("Target")))
    
    //------------------------------------------------------
    
    trait HasInputCol extends Params {
      final val inputCol: Param[String] = new Param[String](this, "inputCol", 
"input column name")
    
      final def getInputCol: String = $(inputCol)
    }
    
    trait HasLabelCol extends Params {
      final val labelCol: Param[String] = new Param[String](this, "labelCol", 
"label column name")
    
      setDefault(labelCol, "label")
    
      final def getLabelCol: String = $(labelCol)
    }
    
    trait HasOutputCol extends Params {
    
      final val outputCol: Param[String] = new Param[String](this, "outputCol", 
"output column name")
    
      setDefault(outputCol, uid + "__output")
    
      final def getOutputCol: String = $(outputCol)
    }
    
    class WeightOfEvidence(override val uid: String) extends HasInputCol with 
HasLabelCol with HasOutputCol {
    
      def this() = this(Identifiable.randomUID("woe"))
    
      def setInputCol(value: String): this.type = set(inputCol, value)
    
      def setLabelCol(value: String): this.type = set(labelCol, value)
    
      def setOutputCol(value: String): this.type = set(outputCol, value)
    
      def transform(dataset: DataFrame): DataFrame = {
        //validateParams()                  --------------***** Important 
*******-----------
        val sorted_dataset = dataset.sort($(inputCol))
        val woeTable = WeightOfEvidence.getWoeTable(sorted_dataset, 
$(inputCol), $(labelCol))
        val woeMap = woeTable.map(r => {
          val category = r.getAs[String]($(inputCol))
          val woe = r.getAs[Double]("woe")
          (category, woe)
        }).rdd.collectAsMap
    
        val trans = udf { (factor: String) =>
          woeMap.get(factor)
        }
        dataset.withColumn($(outputCol), trans(col($(inputCol))))
      }
    
      override def copy(extra: ParamMap): VectorAssembler = defaultCopy(extra)
    }
    
    object WeightOfEvidence {
    
      def getWoeTable(dataset: DataFrame, categoryCol: String, labelCol: 
String): DataFrame = {
    
        val data = dataset.select(categoryCol, labelCol)
        val tmpTableName = "woe_temp"
        data.createOrReplaceTempView(tmpTableName)
        val err = 0.01
        val query =
          s"""
             |SELECT
             |$categoryCol,
             |SUM (IF(CAST ($labelCol AS DOUBLE)=1, 1, 0)) AS 1count,
             |SUM (IF(CAST ($labelCol AS DOUBLE)=0, 1, 0)) AS 0count
             |FROM $tmpTableName
             |GROUP BY $categoryCol
            """.stripMargin
        val groupResult = data.sqlContext.sql(query).cache()
    
        val total0 = 
groupResult.selectExpr("SUM(0count)").first().getAs[Long](0).toDouble
        val total1 = 
groupResult.selectExpr("SUM(1count)").first().getAs[Long](0).toDouble
        groupResult.selectExpr(
          categoryCol,
          s"1count/$total1 AS p1",
          s"0count/$total0 AS p0",
          s"LOG(($err + 1count) / $total1 * $total0 / (0count + $err)) AS woe")
    
      }
    }
    
     import org.apache.spark.sql.Row
    
    //  val test = dataDf.select("Speed_FLM").rdd.map {
    //        case Row(string_val: String) => (string_val, 
functionApplying(string_val, dataDf))
    //    }.toDF("Speed_FLM", "Speed_FLM_1")
    
    // test.show()
    
    
    
    var dataDf1 = dataDf.withColumn("Speed_FLM" , 
dataDf("Speed_FLM").cast(StringType) ).withColumn("Long_Acc" , 
dataDf("Long_Acc").cast(StringType) ).withColumn("Dist" , 
dataDf("Dist").cast(StringType) ).withColumn("Pedal" , 
dataDf("Pedal").cast(StringType) ).withColumn("Time" , 
dataDf("Time").cast(StringType) ).withColumn("Speed_RPM" , 
dataDf("Speed_RPM").cast(StringType) ).withColumn("Speed_ReverseDirection" , 
dataDf("Speed_ReverseDirection").cast(StringType) 
).withColumn("Speed_FLM_Standstill" , 
dataDf("Speed_FLM_Standstill").cast(StringType) ).withColumn("Distance_Per_day" 
, dataDf("Distance_Per_day").cast(StringType) ).withColumn("Starts_Per_Day" , 
dataDf("Starts_Per_Day").cast(StringType) ).withColumn("Driving_Time_Per_Day" , 
dataDf("Driving_Time_Per_Day").cast(StringType) )
    
    dataDf1.show()
    
    val splits = Array(Double.NegativeInfinity, -12.5, 0.0, 12.5, 
Double.PositiveInfinity)
    
    var jk = new 
WeightOfEvidence().setInputCol("Speed_FLM").setOutputCol("Weight_Of_Evidence_Speed_FLM").setLabelCol("Target").transform(dataDf1)
    var bucketizer: Bucketizer = new 
Bucketizer().setInputCol("Weight_Of_Evidence_Speed_FLM").setOutputCol("Speed_FLM"
 + "_Target").setSplits(splits)
    dataDf = bucketizer.transform(jk)
    
    
    here is what i do im trying to replicate woe binning in R



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to