I see the spark job. The println statements has one character per line.
2 0 1 5 / 0 8 / 0 3 / r e g u l a r / p a r t - m .... On Wed, Aug 5, 2015 at 10:27 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > val summary = rowStructText.map(s => s.split(",")).map( > { > s => > *println(s)* > Summary(formatStringAsDate(s(0)), > s(1).replaceAll("\"", "").toLong, > s(3).replaceAll("\"", "").toLong, > s(4).replaceAll("\"", "").toInt, > s(5).replaceAll("\"", ""), > s(6).replaceAll("\"", "").toInt, > formatStringAsDate(s(7)), > formatStringAsDate(s(8)), > s(9).replaceAll("\"", "").toInt, > s(10).replaceAll("\"", "").toInt, > s(11).replaceAll("\"", "").toFloat, > s(12).replaceAll("\"", "").toInt, > s(13).replaceAll("\"", "").toInt, > s(14).replaceAll("\"", "") > ) > } > ) > > summary.count > > AND > > rowStructText.map(s => { > * println(s)* > s.split(",").size > > }).countByValue foreach println > > > DOES NOT PRINT THE OUTPUT. > > When i open up the spark history server it does not launch new SPARK JOBS > for countByValue . Why is that and when does it actually start a new job ? > > > On Wed, Aug 5, 2015 at 10:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[285] at map >> at <console>:169 (1,517252) >> >> What does that mean ? >> >> On Wed, Aug 5, 2015 at 10:14 PM, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> You data might have format issue (with less fields than you expect) >>> >>> Please try execute the following code to check whether all the lines >>> with 14 fields: >>> rowStructText.map(s => s.split(",").size).countByValue foreach >>> println >>> >>> On Thu, Aug 6, 2015 at 1:01 PM, Randy Gelhausen < >>> rgelhau...@hortonworks.com> wrote: >>> >>>> You likely have a problem with your parsing logic. I can’t see the data >>>> to know for sure, but since Spark is lazily evaluated, it doesn’t try to >>>> run your map until you execute the SQL that applies it to the data. >>>> >>>> That’s why your first paragraph can run (it’s only defining metadata), >>>> but paragraph 2 throws an error. >>>> >>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>> Reply-To: "users@zeppelin.incubator.apache.org" >>>> Date: Thursday, August 6, 2015 at 12:37 AM >>>> To: "users@zeppelin.incubator.apache.org" >>>> Subject: Re: Unable to run count(*) >>>> >>>> %sql >>>> select * from summary >>>> >>>> Throws same error >>>> >>>> On Wed, Aug 5, 2015 at 9:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>> wrote: >>>> >>>>> Para-1 >>>>> import java.text.SimpleDateFormat >>>>> import java.util.Calendar >>>>> import java.sql.Date >>>>> >>>>> def formatStringAsDate(dateStr: String) = new java.sql.Date(new >>>>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime()) >>>>> >>>>> >>>>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) >>>>> val rowStructText = >>>>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz") >>>>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : >>>>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: >>>>> Float, f12: Integer, f13: Integer, f14: String) >>>>> >>>>> val summary = rowStructText.map(s => s.split(",")).map( >>>>> { >>>>> s => >>>>> Summary(formatStringAsDate(s(0)), >>>>> s(1).replaceAll("\"", "").toLong, >>>>> s(3).replaceAll("\"", "").toLong, >>>>> s(4).replaceAll("\"", "").toInt, >>>>> s(5).replaceAll("\"", ""), >>>>> s(6).replaceAll("\"", "").toInt, >>>>> formatStringAsDate(s(7)), >>>>> formatStringAsDate(s(8)), >>>>> s(9).replaceAll("\"", "").toInt, >>>>> s(10).replaceAll("\"", "").toInt, >>>>> s(11).replaceAll("\"", "").toFloat, >>>>> s(12).replaceAll("\"", "").toInt, >>>>> s(13).replaceAll("\"", "").toInt, >>>>> s(14).replaceAll("\"", "") >>>>> ) >>>>> } >>>>> ).toDF() >>>>> summary.registerTempTable("summary") >>>>> >>>>> >>>>> >>>>> Output: >>>>> import java.text.SimpleDateFormat import java.util.Calendar import >>>>> java.sql.Date formatStringAsDate: (dateStr: String)java.sql.Date >>>>> rowStructText: org.apache.spark.rdd.RDD[String] = >>>>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz >>>>> MapPartitionsRDD[152] at textFile at <console>:100 defined class Summary >>>>> summary: org.apache.spark.sql.DataFrame = [f1: date, f2: bigint, f3: >>>>> bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9: int, f10: >>>>> int, f11: float, f12: int, f13: int, f14: string] >>>>> >>>>> >>>>> Para-2 (DOES NOT WORK) >>>>> %sql select count(*) from summary >>>>> >>>>> Output >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task 0 in stage 29.0 failed 4 times, most recent failure: Lost task 0.3 in >>>>> stage 29.0 (TID 1844, datanode-6-3486.phx01.dev.ebayc3.com): >>>>> java.lang.ArrayIndexOutOfBoundsException: 1 at >>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:109) >>>>> at >>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:107) >>>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at >>>>> scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at >>>>> scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at >>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:42) >>>>> at >>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:37) >>>>> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at >>>>> org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> Suggestions ? >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak