What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = {
    val spd = new PipelineDate(start)
    val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps of user_ids and 0
    val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
    val ev1c = ev1rdd.count.toDouble

    // do the same as above for event2 events, only substitute 0s with 1s
    val ev2rdds = for {
       dt <- PipelineDate.getPeriod(spd+1,epd)
       val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

    // cache all event1 and event2 RDDs
    ev2rdds.foreach(_.cache)
    val cts = for {
      ev2 <- ev2rdds
    } yield ev2.count

    val retent = for {
      ev2rdd <- ev2rdds
      val ret = ev1rdd.union(ev2rdd).groupByKey()
    } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)

    val rcts = retent.map(_.count)
println("----------------------------------------------------------------------")
    println(s"${rcts}")
    println(s"${cts}")

    for {
      c <- rcts
    } yield(ev1c/c.toDouble)
    //Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the union().groupBy().filter(....) segment is working (the List(0,0) output). The app is not failing, it finishes just fine.

Any ideas?
Ognen

Reply via email to