For some reason the path of the HDFS is coming up in the data i am reading.
rowStructText*.filter(s => s.length != 1)*.map(s => { println(s) s.split("\t").size }).countByValue foreach println However the output (println()) on the executors still have the the characters of the HDFS file , one character per line. On Wed, Aug 5, 2015 at 10:30 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > I see the spark job. > > The println statements has one character per line. > > 2 > 0 > 1 > 5 > / > 0 > 8 > / > 0 > 3 > / > r > e > g > u > l > a > r > / > p > a > r > t > - > m > > > .... > > > On Wed, Aug 5, 2015 at 10:27 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> val summary = rowStructText.map(s => s.split(",")).map( >> { >> s => >> *println(s)* >> Summary(formatStringAsDate(s(0)), >> s(1).replaceAll("\"", "").toLong, >> s(3).replaceAll("\"", "").toLong, >> s(4).replaceAll("\"", "").toInt, >> s(5).replaceAll("\"", ""), >> s(6).replaceAll("\"", "").toInt, >> formatStringAsDate(s(7)), >> formatStringAsDate(s(8)), >> s(9).replaceAll("\"", "").toInt, >> s(10).replaceAll("\"", "").toInt, >> s(11).replaceAll("\"", "").toFloat, >> s(12).replaceAll("\"", "").toInt, >> s(13).replaceAll("\"", "").toInt, >> s(14).replaceAll("\"", "") >> ) >> } >> ) >> >> summary.count >> >> AND >> >> rowStructText.map(s => { >> * println(s)* >> s.split(",").size >> >> }).countByValue foreach println >> >> >> DOES NOT PRINT THE OUTPUT. >> >> When i open up the spark history server it does not launch new SPARK JOBS >> for countByValue . Why is that and when does it actually start a new job ? >> >> >> On Wed, Aug 5, 2015 at 10:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[285] at >>> map at <console>:169 (1,517252) >>> >>> What does that mean ? >>> >>> On Wed, Aug 5, 2015 at 10:14 PM, Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> You data might have format issue (with less fields than you expect) >>>> >>>> Please try execute the following code to check whether all the lines >>>> with 14 fields: >>>> rowStructText.map(s => s.split(",").size).countByValue foreach >>>> println >>>> >>>> On Thu, Aug 6, 2015 at 1:01 PM, Randy Gelhausen < >>>> rgelhau...@hortonworks.com> wrote: >>>> >>>>> You likely have a problem with your parsing logic. I can’t see the >>>>> data to know for sure, but since Spark is lazily evaluated, it doesn’t try >>>>> to run your map until you execute the SQL that applies it to the data. >>>>> >>>>> That’s why your first paragraph can run (it’s only defining metadata), >>>>> but paragraph 2 throws an error. >>>>> >>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>> Reply-To: "users@zeppelin.incubator.apache.org" >>>>> Date: Thursday, August 6, 2015 at 12:37 AM >>>>> To: "users@zeppelin.incubator.apache.org" >>>>> Subject: Re: Unable to run count(*) >>>>> >>>>> %sql >>>>> select * from summary >>>>> >>>>> Throws same error >>>>> >>>>> On Wed, Aug 5, 2015 at 9:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Para-1 >>>>>> import java.text.SimpleDateFormat >>>>>> import java.util.Calendar >>>>>> import java.sql.Date >>>>>> >>>>>> def formatStringAsDate(dateStr: String) = new java.sql.Date(new >>>>>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime()) >>>>>> >>>>>> >>>>>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) >>>>>> val rowStructText = >>>>>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz") >>>>>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : >>>>>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: >>>>>> Float, f12: Integer, f13: Integer, f14: String) >>>>>> >>>>>> val summary = rowStructText.map(s => s.split(",")).map( >>>>>> { >>>>>> s => >>>>>> Summary(formatStringAsDate(s(0)), >>>>>> s(1).replaceAll("\"", "").toLong, >>>>>> s(3).replaceAll("\"", "").toLong, >>>>>> s(4).replaceAll("\"", "").toInt, >>>>>> s(5).replaceAll("\"", ""), >>>>>> s(6).replaceAll("\"", "").toInt, >>>>>> formatStringAsDate(s(7)), >>>>>> formatStringAsDate(s(8)), >>>>>> s(9).replaceAll("\"", "").toInt, >>>>>> s(10).replaceAll("\"", "").toInt, >>>>>> s(11).replaceAll("\"", "").toFloat, >>>>>> s(12).replaceAll("\"", "").toInt, >>>>>> s(13).replaceAll("\"", "").toInt, >>>>>> s(14).replaceAll("\"", "") >>>>>> ) >>>>>> } >>>>>> ).toDF() >>>>>> summary.registerTempTable("summary") >>>>>> >>>>>> >>>>>> >>>>>> Output: >>>>>> import java.text.SimpleDateFormat import java.util.Calendar import >>>>>> java.sql.Date formatStringAsDate: (dateStr: String)java.sql.Date >>>>>> rowStructText: org.apache.spark.rdd.RDD[String] = >>>>>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz >>>>>> MapPartitionsRDD[152] at textFile at <console>:100 defined class Summary >>>>>> summary: org.apache.spark.sql.DataFrame = [f1: date, f2: bigint, f3: >>>>>> bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9: int, f10: >>>>>> int, f11: float, f12: int, f13: int, f14: string] >>>>>> >>>>>> >>>>>> Para-2 (DOES NOT WORK) >>>>>> %sql select count(*) from summary >>>>>> >>>>>> Output >>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>> Task 0 in stage 29.0 failed 4 times, most recent failure: Lost task 0.3 >>>>>> in >>>>>> stage 29.0 (TID 1844, datanode-6-3486.phx01.dev.ebayc3.com): >>>>>> java.lang.ArrayIndexOutOfBoundsException: 1 at >>>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:109) >>>>>> at >>>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:107) >>>>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at >>>>>> scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at >>>>>> scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at >>>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:42) >>>>>> at >>>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:37) >>>>>> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at >>>>>> org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>> at >>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>> at >>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>> at >>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>> at >>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>> at >>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) at >>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> >>>>>> Suggestions ? >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang >>>> >>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak