Figured it out val summary = rowStructText.filter(s => s.length != 1).map(s => s.split("\t"))
AND select * from summary shows the table On Wed, Aug 5, 2015 at 10:37 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > For some reason the path of the HDFS is coming up in the data i am reading. > > > rowStructText*.filter(s => s.length != 1)*.map(s => { > println(s) > s.split("\t").size > > }).countByValue foreach println > > However the output (println()) on the executors still have the the > characters of the HDFS file , one character per line. > > On Wed, Aug 5, 2015 at 10:30 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> I see the spark job. >> >> The println statements has one character per line. >> >> 2 >> 0 >> 1 >> 5 >> / >> 0 >> 8 >> / >> 0 >> 3 >> / >> r >> e >> g >> u >> l >> a >> r >> / >> p >> a >> r >> t >> - >> m >> >> >> .... >> >> >> On Wed, Aug 5, 2015 at 10:27 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> val summary = rowStructText.map(s => s.split(",")).map( >>> { >>> s => >>> *println(s)* >>> Summary(formatStringAsDate(s(0)), >>> s(1).replaceAll("\"", "").toLong, >>> s(3).replaceAll("\"", "").toLong, >>> s(4).replaceAll("\"", "").toInt, >>> s(5).replaceAll("\"", ""), >>> s(6).replaceAll("\"", "").toInt, >>> formatStringAsDate(s(7)), >>> formatStringAsDate(s(8)), >>> s(9).replaceAll("\"", "").toInt, >>> s(10).replaceAll("\"", "").toInt, >>> s(11).replaceAll("\"", "").toFloat, >>> s(12).replaceAll("\"", "").toInt, >>> s(13).replaceAll("\"", "").toInt, >>> s(14).replaceAll("\"", "") >>> ) >>> } >>> ) >>> >>> summary.count >>> >>> AND >>> >>> rowStructText.map(s => { >>> * println(s)* >>> s.split(",").size >>> >>> }).countByValue foreach println >>> >>> >>> DOES NOT PRINT THE OUTPUT. >>> >>> When i open up the spark history server it does not launch new SPARK >>> JOBS for countByValue . Why is that and when does it actually start a new >>> job ? >>> >>> >>> On Wed, Aug 5, 2015 at 10:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[285] at >>>> map at <console>:169 (1,517252) >>>> >>>> What does that mean ? >>>> >>>> On Wed, Aug 5, 2015 at 10:14 PM, Jeff Zhang <zjf...@gmail.com> wrote: >>>> >>>>> You data might have format issue (with less fields than you expect) >>>>> >>>>> Please try execute the following code to check whether all the lines >>>>> with 14 fields: >>>>> rowStructText.map(s => s.split(",").size).countByValue foreach >>>>> println >>>>> >>>>> On Thu, Aug 6, 2015 at 1:01 PM, Randy Gelhausen < >>>>> rgelhau...@hortonworks.com> wrote: >>>>> >>>>>> You likely have a problem with your parsing logic. I can’t see the >>>>>> data to know for sure, but since Spark is lazily evaluated, it doesn’t >>>>>> try >>>>>> to run your map until you execute the SQL that applies it to the >>>>>> data. >>>>>> >>>>>> That’s why your first paragraph can run (it’s only defining >>>>>> metadata), but paragraph 2 throws an error. >>>>>> >>>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>>> Reply-To: "users@zeppelin.incubator.apache.org" >>>>>> Date: Thursday, August 6, 2015 at 12:37 AM >>>>>> To: "users@zeppelin.incubator.apache.org" >>>>>> Subject: Re: Unable to run count(*) >>>>>> >>>>>> %sql >>>>>> select * from summary >>>>>> >>>>>> Throws same error >>>>>> >>>>>> On Wed, Aug 5, 2015 at 9:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Para-1 >>>>>>> import java.text.SimpleDateFormat >>>>>>> import java.util.Calendar >>>>>>> import java.sql.Date >>>>>>> >>>>>>> def formatStringAsDate(dateStr: String) = new java.sql.Date(new >>>>>>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime()) >>>>>>> >>>>>>> >>>>>>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) >>>>>>> val rowStructText = >>>>>>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz") >>>>>>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : >>>>>>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, >>>>>>> f11: >>>>>>> Float, f12: Integer, f13: Integer, f14: String) >>>>>>> >>>>>>> val summary = rowStructText.map(s => s.split(",")).map( >>>>>>> { >>>>>>> s => >>>>>>> Summary(formatStringAsDate(s(0)), >>>>>>> s(1).replaceAll("\"", "").toLong, >>>>>>> s(3).replaceAll("\"", "").toLong, >>>>>>> s(4).replaceAll("\"", "").toInt, >>>>>>> s(5).replaceAll("\"", ""), >>>>>>> s(6).replaceAll("\"", "").toInt, >>>>>>> formatStringAsDate(s(7)), >>>>>>> formatStringAsDate(s(8)), >>>>>>> s(9).replaceAll("\"", "").toInt, >>>>>>> s(10).replaceAll("\"", "").toInt, >>>>>>> s(11).replaceAll("\"", "").toFloat, >>>>>>> s(12).replaceAll("\"", "").toInt, >>>>>>> s(13).replaceAll("\"", "").toInt, >>>>>>> s(14).replaceAll("\"", "") >>>>>>> ) >>>>>>> } >>>>>>> ).toDF() >>>>>>> summary.registerTempTable("summary") >>>>>>> >>>>>>> >>>>>>> >>>>>>> Output: >>>>>>> import java.text.SimpleDateFormat import java.util.Calendar import >>>>>>> java.sql.Date formatStringAsDate: (dateStr: String)java.sql.Date >>>>>>> rowStructText: org.apache.spark.rdd.RDD[String] = >>>>>>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz >>>>>>> MapPartitionsRDD[152] at textFile at <console>:100 defined class Summary >>>>>>> summary: org.apache.spark.sql.DataFrame = [f1: date, f2: bigint, f3: >>>>>>> bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9: int, f10: >>>>>>> int, f11: float, f12: int, f13: int, f14: string] >>>>>>> >>>>>>> >>>>>>> Para-2 (DOES NOT WORK) >>>>>>> %sql select count(*) from summary >>>>>>> >>>>>>> Output >>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>> Task 0 in stage 29.0 failed 4 times, most recent failure: Lost task 0.3 >>>>>>> in >>>>>>> stage 29.0 (TID 1844, datanode-6-3486.phx01.dev.ebayc3.com): >>>>>>> java.lang.ArrayIndexOutOfBoundsException: 1 at >>>>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:109) >>>>>>> at >>>>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:107) >>>>>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at >>>>>>> scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at >>>>>>> scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at >>>>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:42) >>>>>>> at >>>>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:37) >>>>>>> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at >>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> >>>>>>> Suggestions ? >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak