the issue was with print? printing on worker? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi>
On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski < [email protected]> wrote: > Strike that. Figured it out. Don't you just hate it when you fire off an > email and you figure it out as it is being sent? ;) > Ognen > > > On 3/7/14, 12:41 PM, Ognen Duzlevski wrote: > >> What is wrong with this code? >> >> A condensed set of this code works in the spark-shell. >> >> It does not work when deployed via a jar. >> >> def calcSimpleRetention(start:String,end:String,event1: >> String,event2:String):List[Double] = { >> val spd = new PipelineDate(start) >> val epd = new PipelineDate(end) >> // filter for event1 events and return RDDs that are maps of user_ids >> and 0 >> val f = sc.textFile(spd.toJsonHdfsFileName) >> val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") >> == event1).map(line => (line.split(",")(2).split(":") >> (1).replace("\"",""),1)).cache >> val ev1c = ev1rdd.count.toDouble >> >> // do the same as above for event2 events, only substitute 0s with 1s >> val ev2rdds = for { >> dt <- PipelineDate.getPeriod(spd+1,epd) >> val f1 = sc.textFile(dt.toJsonHdfsFileName) >> } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == >> event2).map(line => (line.split(",")(2).split(":") >> (1).replace("\"",""),1)).distinct) >> >> // cache all event1 and event2 RDDs >> ev2rdds.foreach(_.cache) >> val cts = for { >> ev2 <- ev2rdds >> } yield ev2.count >> >> val retent = for { >> ev2rdd <- ev2rdds >> val ret = ev1rdd.union(ev2rdd).groupByKey() >> } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0) >> >> val rcts = retent.map(_.count) >> println("----------------------------------------------------------------------") >> >> println(s"${rcts}") >> println(s"${cts}") >> >> for { >> c <- rcts >> } yield(ev1c/c.toDouble) >> //Map(result:_*) >> } >> >> This is what this code prints: >> List(0, 0) >> List(785912, 825254) >> List(Infinity, Infinity) >> >> My question is: it does not appear that the >> union().groupBy().filter(....) segment is working (the List(0,0) output). >> The app is not failing, it finishes just fine. >> >> Any ideas? >> Ognen >> > > -- > Some people, when confronted with a problem, think "I know, I'll use > regular expressions." Now they have two problems. > -- Jamie Zawinski > >
