What is wrong with this code?
A condensed set of this code works in the spark-shell.
It does not work when deployed via a jar.
def
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
= {
val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps of
user_ids and 0
val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd =
f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event1).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
val ev1c = ev1rdd.count.toDouble
// do the same as above for event2 events, only substitute 0s with 1s
val ev2rdds = for {
dt <- PipelineDate.getPeriod(spd+1,epd)
val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
== event2).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)
// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
ev2 <- ev2rdds
} yield ev2.count
val retent = for {
ev2rdd <- ev2rdds
val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)
val rcts = retent.map(_.count)
println("----------------------------------------------------------------------")
println(s"${rcts}")
println(s"${cts}")
for {
c <- rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
}
This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)
My question is: it does not appear that the
union().groupBy().filter(....) segment is working (the List(0,0)
output). The app is not failing, it finishes just fine.
Any ideas?
Ognen