No.
It was a logical error.
val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event1).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache should have
mapped to ,0, not ,1
I have had the most awful time figuring out these "looped" things. It
seems like it is next to impossible to run a .filter() operation in a
for loop, it seems to work if you yield .filter()
Still don't understand why that is...
Ognen
On 3/7/14, 1:05 PM, Mayur Rustagi wrote:
the issue was with print?
printing on worker?
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>
On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski
<[email protected] <mailto:[email protected]>> wrote:
Strike that. Figured it out. Don't you just hate it when you fire
off an email and you figure it out as it is being sent? ;)
Ognen
On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
What is wrong with this code?
A condensed set of this code works in the spark-shell.
It does not work when deployed via a jar.
def
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
= {
val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps
of user_ids and 0
val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd =
f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event1).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
val ev1c = ev1rdd.count.toDouble
// do the same as above for event2 events, only substitute
0s with 1s
val ev2rdds = for {
dt <- PipelineDate.getPeriod(spd+1,epd)
val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield
(f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event2).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)
// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
ev2 <- ev2rdds
} yield ev2.count
val retent = for {
ev2rdd <- ev2rdds
val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e => e._2.length > 1 &&
e._2.filter(_==0).length>0)
val rcts = retent.map(_.count)
println("----------------------------------------------------------------------")
println(s"${rcts}")
println(s"${cts}")
for {
c <- rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
}
This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)
My question is: it does not appear that the
union().groupBy().filter(....) segment is working (the
List(0,0) output). The app is not failing, it finishes just fine.
Any ideas?
Ognen
--
Some people, when confronted with a problem, think "I know, I'll
use regular expressions." Now they have two problems.
-- Jamie Zawinski
--
Some people, when confronted with a problem, think "I know, I'll use regular
expressions." Now they have two problems.
-- Jamie Zawinski