Github user Srinivasan2Nagarajan commented on the issue:
https://github.com/apache/spark/pull/10803
Hi yuo,
i have used this class in my code but it not given the output as i
expected. the value which it produces is not continuous
val startTimeMillis = System.currentTimeMillis()
val s1 = System.nanoTime()
import org.apache.spark.sql.types.{ StringType, DoubleType, IntegerType}
import org.apache.spark.mllib.classification.{LogisticRegressionModel,
LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.ml.feature.{ Tokenizer, HashingTF, IDF }
import org.apache.spark.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.annotation.Since
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.feature.{ VectorAssembler, StandardScaler }
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.{DataFrame, Dataset, Column}
import org.apache.spark.sql.functions._
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.linalg.{ Vector, Vectors, SparseVector }
import scala.util.parsing.combinator._
import org.apache.spark.ml.feature._
import sqlContext.implicits._
import org.apache.spark.ml.feature.Bucketizer
var dataDf = spark.sql("select (.75 * longacc_0to1point5 + 2.25 *
longacc_1point5to3 + 3.75 * longacc_3to4point5 + 5.25 * longacc_4point5to6 +
6.75 * longacc_6to7point5 + 8.25 * longacc_7point5to9 + 9.75 *
longacc_9to10point5 + 11.25 * longacc_10point5to12 + 12.75 *
longacc_12to13point5 + 14.5 * longacc_13point5)/( longacc_0to1point5 +
longacc_1point5to3 + longacc_3to4point5 + longacc_4point5to6 +
longacc_6to7point5 + longacc_7point5to9 + longacc_9to10point5 +
longacc_10point5to12 + longacc_12to13point5 + longacc_13point5) as Long_Acc,
((5 * Speed_FLM_0to10 + 15 * Speed_FLM_10to20 + 25 * Speed_FLM_20to30 + 40 *
Speed_FLM_30to50 + 65 * Speed_FLM_50to80 + 100 * Speed_FLM_80to120+135 *
Speed_FLM_120to150 + 175 * Speed_FLM_150to200 + 225 * Speed_FLM_200toInf)/(
Speed_FLM_0to10 + Speed_FLM_10to20 + Speed_FLM_20to30+ Speed_FLM_30to50 +
Speed_FLM_50to80 + Speed_FLM_80to120 + Speed_FLM_120to150 + Speed_FLM_150to200
+ Speed_FLM_200toInf)) as Speed_FLM, ((1 * dist_0to2km + 3.5 * dist_2to5k
m + 7.5 * dist_5to10km + 15 * dist_10to20km + 35 * dist_20to50km + 100 *
dist_50to150km + 175 * dist_150to500km + 600 * dist_500km)/( dist_0to2km +
dist_2to5km + dist_5to10km + dist_10to20km + dist_20to50km + dist_50to150km +
dist_150to500km + dist_500km)) as Dist, ((5 * Pedal_0to10perc + 15 *
Pedal_10to20perc + 25 * Pedal_20to30perc + 35 * Pedal_30to40perc + 45 *
Pedal_40to50perc + 55 * Pedal_50to60perc + 65* Pedal_60to70perc + 75 *
Pedal_70to80perc + 85 * Pedal_80to90perc + 95 * Pedal_90perc)/ (
Pedal_0to10perc + Pedal_10to20perc + Pedal_20to30perc + Pedal_30to40perc +
Pedal_40to50perc + Pedal_50to60perc + Pedal_60to70perc + Pedal_70to80perc +
Pedal_80to90perc + Pedal_90perc)) as Pedal, ((1 * time_1min + 3 *
time_1to5min + 7.5 * time_5to10min + 20 * time_10to30min + 45 * time_30to60min
+ 90 * time_60to120min + 210 * time_120to300min+400 * time_300min)/(
time_1min + time_1to5min + time_5to10min + time_10to30min + time_30to60min +
time_60to120min + time_120to300m
in + time_300min)) as Time, ((1 * Pauses_less_1min +
5*Pauses_1_10min+20*Pauses_10_30min + 45 * Pauses_30_60min +
90*Pauses_60_120min + 210 * Pauses_120_300min + 3750 * Pauses_300_24hr + 5040 *
Pauses_24hr_1week + 21600 * Pauses_1week_1month+
43200*Pauses_greater_30days)/(Pauses_less_1min+ Pauses_1_10min+Pauses_10_30min
+ Pauses_30_60min + Pauses_60_120min + Pauses_120_300min + Pauses_300_24hr +
Pauses_24hr_1week + Pauses_1week_1month + Pauses_greater_30days)) as Pauses,
((750 * Speed_RPM_1000 + 1250 * Speed_RPM_1000to1500+1750 *
Speed_RPM_1500to2000 + 2250 * Speed_RPM_2000to2500+2750 *
Speed_RPM_2500to3000+3250 * Speed_RPM_3000to3500 + 4750 * Speed_RPM_3500to4000
+ 4250 * Speed_RPM_4000to4500 + 4750 * Speed_RPM_4500to5000+5250 *
Speed_RPM_5000to5500 + 5750 * Speed_RPM_5500to6000 + 6250 *
Speed_RPM_6000to6500 + 6750 * Speed_RPM_6500to7000 + 7250 * Speed_RPM_7000)/(
Speed_RPM_1000 + Speed_RPM_1000to1500 + Speed_RPM_1500to2000 +
Speed_RPM_2000to2500 + Speed_RPM_2500to3000 + Speed_RPM
_3000to3500 + Speed_RPM_3500to4000 + Speed_RPM_4000to4500 +
Speed_RPM_4500to5000 + Speed_RPM_5000to5500 + Speed_RPM_5500to6000 +
Speed_RPM_6000to6500 + Speed_RPM_6500to7000 + Speed_RPM_7000)) as Speed_RPM,
Speed_ReverseDirection, Speed_FLM_Standstill, Distance_Per_day, Starts_Per_Day,
Driving_Time_Per_Day, Target from fisher_vin_data")
dataDf = dataDf.na.fill(0)
println("Needed Output 1 ----> " + dataDf.count())
def getMaximumValue(x: Column) = dataDf.agg(max(x)).collect().map(r =>
r.toSeq(0).asInstanceOf[Double]).toList(0)
def getMinimumValue(x: Column) = dataDf.agg(min(x)).collect().map(r =>
r.toSeq(0).asInstanceOf[Double]).toList(0)
def Normalize(x: Column): Column = {
val max_x = getMaximumValue(x.cast("double"))
val min_x = getMinimumValue(x.cast("double"))
return (x.cast("double") - min_x)/ (max_x - min_x)
}
dataDf = dataDf.withColumn("Long_Acc",
Normalize(dataDf.col("Long_Acc"))).withColumn("Speed_FLM",
Normalize(dataDf.col("Speed_FLM"))).withColumn("Dist",
Normalize(dataDf.col("Dist"))).withColumn("Pedal",
Normalize(dataDf.col("Pedal"))).withColumn("Time",
Normalize(dataDf.col("Time"))).withColumn("Pauses",
Normalize(dataDf.col("Pauses"))).withColumn("Speed_RPM",
Normalize(dataDf.col("Speed_RPM"))).withColumn("Speed_ReverseDirection",
Normalize(dataDf.col("Speed_ReverseDirection"))).withColumn("Speed_FLM_Standstill",
Normalize(dataDf.col("Speed_FLM_Standstill"))).withColumn("Distance_Per_day",
Normalize(dataDf.col("Distance_Per_day"))).withColumn("Starts_Per_Day",
Normalize(dataDf.col("Starts_Per_Day"))).withColumn("Driving_Time_Per_Day",
Normalize(dataDf.col("Driving_Time_Per_Day"))).withColumn("Target",
Normalize(dataDf.col("Target")))
//------------------------------------------------------
trait HasInputCol extends Params {
final val inputCol: Param[String] = new Param[String](this, "inputCol",
"input column name")
final def getInputCol: String = $(inputCol)
}
trait HasLabelCol extends Params {
final val labelCol: Param[String] = new Param[String](this, "labelCol",
"label column name")
setDefault(labelCol, "label")
final def getLabelCol: String = $(labelCol)
}
trait HasOutputCol extends Params {
final val outputCol: Param[String] = new Param[String](this, "outputCol",
"output column name")
setDefault(outputCol, uid + "__output")
final def getOutputCol: String = $(outputCol)
}
class WeightOfEvidence(override val uid: String) extends HasInputCol with
HasLabelCol with HasOutputCol {
def this() = this(Identifiable.randomUID("woe"))
def setInputCol(value: String): this.type = set(inputCol, value)
def setLabelCol(value: String): this.type = set(labelCol, value)
def setOutputCol(value: String): this.type = set(outputCol, value)
def transform(dataset: DataFrame): DataFrame = {
//validateParams() --------------***** Important
*******-----------
val sorted_dataset = dataset.sort($(inputCol))
val woeTable = WeightOfEvidence.getWoeTable(sorted_dataset,
$(inputCol), $(labelCol))
val woeMap = woeTable.map(r => {
val category = r.getAs[String]($(inputCol))
val woe = r.getAs[Double]("woe")
(category, woe)
}).rdd.collectAsMap
val trans = udf { (factor: String) =>
woeMap.get(factor)
}
dataset.withColumn($(outputCol), trans(col($(inputCol))))
}
override def copy(extra: ParamMap): VectorAssembler = defaultCopy(extra)
}
object WeightOfEvidence {
def getWoeTable(dataset: DataFrame, categoryCol: String, labelCol:
String): DataFrame = {
val data = dataset.select(categoryCol, labelCol)
val tmpTableName = "woe_temp"
data.createOrReplaceTempView(tmpTableName)
val err = 0.01
val query =
s"""
|SELECT
|$categoryCol,
|SUM (IF(CAST ($labelCol AS DOUBLE)=1, 1, 0)) AS 1count,
|SUM (IF(CAST ($labelCol AS DOUBLE)=0, 1, 0)) AS 0count
|FROM $tmpTableName
|GROUP BY $categoryCol
""".stripMargin
val groupResult = data.sqlContext.sql(query).cache()
val total0 =
groupResult.selectExpr("SUM(0count)").first().getAs[Long](0).toDouble
val total1 =
groupResult.selectExpr("SUM(1count)").first().getAs[Long](0).toDouble
groupResult.selectExpr(
categoryCol,
s"1count/$total1 AS p1",
s"0count/$total0 AS p0",
s"LOG(($err + 1count) / $total1 * $total0 / (0count + $err)) AS woe")
}
}
import org.apache.spark.sql.Row
// val test = dataDf.select("Speed_FLM").rdd.map {
// case Row(string_val: String) => (string_val,
functionApplying(string_val, dataDf))
// }.toDF("Speed_FLM", "Speed_FLM_1")
// test.show()
var dataDf1 = dataDf.withColumn("Speed_FLM" ,
dataDf("Speed_FLM").cast(StringType) ).withColumn("Long_Acc" ,
dataDf("Long_Acc").cast(StringType) ).withColumn("Dist" ,
dataDf("Dist").cast(StringType) ).withColumn("Pedal" ,
dataDf("Pedal").cast(StringType) ).withColumn("Time" ,
dataDf("Time").cast(StringType) ).withColumn("Speed_RPM" ,
dataDf("Speed_RPM").cast(StringType) ).withColumn("Speed_ReverseDirection" ,
dataDf("Speed_ReverseDirection").cast(StringType)
).withColumn("Speed_FLM_Standstill" ,
dataDf("Speed_FLM_Standstill").cast(StringType) ).withColumn("Distance_Per_day"
, dataDf("Distance_Per_day").cast(StringType) ).withColumn("Starts_Per_Day" ,
dataDf("Starts_Per_Day").cast(StringType) ).withColumn("Driving_Time_Per_Day" ,
dataDf("Driving_Time_Per_Day").cast(StringType) )
dataDf1.show()
val splits = Array(Double.NegativeInfinity, -12.5, 0.0, 12.5,
Double.PositiveInfinity)
var jk = new
WeightOfEvidence().setInputCol("Speed_FLM").setOutputCol("Weight_Of_Evidence_Speed_FLM").setLabelCol("Target").transform(dataDf1)
var bucketizer: Bucketizer = new
Bucketizer().setInputCol("Weight_Of_Evidence_Speed_FLM").setOutputCol("Speed_FLM"
+ "_Target").setSplits(splits)
dataDf = bucketizer.transform(jk)
here is what i do im trying to replicate woe binning in R
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]