Hi Sanjay,
I tried running your code on spark shell piece by piece –
// Setup
val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val lines = Array[String](line1, line2)
val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin
with
val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, so good
val r3 = r2.map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
// Returns a pair (String, String), good
}
else {
"" // Returns a String, bad
}
}) // RDD[Serializable] – PROBLEM
I was not even able to apply flatMapValues since the filtered RDD passed to it
is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled
correctly.
The following changes in your snippet make it work as intended -
reacRdd.map(line => line.split(',')).map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
}
else {
("","")
}
}).filter(pair => pair._1.length() > 0).flatMapValues(skus =>
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
Please note that this too saves lines like (025126,Chills), i.e. with opening
and closing brackets ( and ). If you want to get rid of them, better do another
map operation to map pair to String.
Kapil
From: Sanjay Subramanian [mailto:[email protected]]
Sent: 31 December 2014 13:42
Cc: [email protected]
Subject: FlatMapValues
hey guys
My dataset is like this
025126,Chills,8.10,Injection site oedema,8.10,Injection site
reaction,8.10,Malaise,8.10,Myalgia,8.10
Intended output is
==================
025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia
My code is as follows but the flatMapValues does not work even after I have
created the pair RDD.
************************************************************************
reacRdd.map(line => line.split(',')).map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
}
else {
""
}
}).filter(line => line.toString.length() > 0).flatMapValues(skus =>
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
************************************************************************
thanks
sanjay