Although not elegantly I got the output via my code but totally agree on the parsing 5 times (thats really bad).Will add your suggested logic and check it out. I have a "long" way to the finish line. I am re-architecting my entire hadoop code and getting it onto spark. Check out what I do at www.medicalsidefx.orgPrimarily an iPhone app but underlying is Lucene, Hadoop and hopefully soon in 2015 - Spark :-) From: Sean Owen <so...@cloudera.com> To: Sanjay Subramanian <sanjaysubraman...@yahoo.com> Cc: "user@spark.apache.org" <user@spark.apache.org> Sent: Wednesday, December 24, 2014 8:56 AM Subject: Re: How to identify erroneous input record ? I don't believe that works since your map function does not return a value for lines shorter than 13 tokens. You should use flatMap and Some/None. (You probably want to not parse the string 5 times too.)
val demoRddFilterMap = demoRddFilter.flatMap { line => val tokens = line.split('$') if (tokens.length >= 13) { val parsed = tokens(0) + "~" + tokens(5) + "~" + tokens(11) + "~" + tokens(12) Some(parsed) } else { None } } On Wed, Dec 24, 2014 at 4:35 PM, Sanjay Subramanian <sanjaysubraman...@yahoo.com.invalid> wrote: > DOH Looks like I did not have enough coffee before I asked this :-) > I added the if statement... > > var demoRddFilter = demoRdd.filter(line => > !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || > !line.contains("primaryid$caseid$caseversion")) > var demoRddFilterMap = demoRddFilter.map(line => { > if (line.split('$').length >= 13){ > line.split('$')(0) + "~" + line.split('$')(5) + "~" + > line.split('$')(11) + "~" + line.split('$')(12) > } > }) > > > ________________________________ > From: Sanjay Subramanian <sanjaysubraman...@yahoo.com.INVALID> > To: "user@spark.apache.org" <user@spark.apache.org> > Sent: Wednesday, December 24, 2014 8:28 AM > Subject: How to identify erroneous input record ? > > hey guys > > One of my input records has an problem that makes the code fail. > > var demoRddFilter = demoRdd.filter(line => > !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") || > !line.contains("primaryid$caseid$caseversion")) > > var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" + > line.split('$')(5) + "~" + line.split('$')(11) + "~" + line.split('$')(12)) > > demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + outFile) > > > This is possibly happening because perhaps one input record may not have 13 > fields. > > If this were Hadoop mapper code , I have 2 ways to solve this > > 1. test the number of fields of each line before applying the map function > > 2. enclose the mapping function in a try catch block so that the mapping > function only fails for the erroneous record > > How do I implement 1. or 2. in the Spark code ? > > Thanks > > > sanjay > > > > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org