Figured it out

val summary  = rowStructText.filter(s => s.length != 1).map(s =>
s.split("\t"))

AND

select * from summary shows the table

On Wed, Aug 5, 2015 at 10:37 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> For some reason the path of the HDFS is coming up in the data i am reading.
>
>
> rowStructText*.filter(s => s.length != 1)*.map(s => {
>     println(s)
>     s.split("\t").size
>
> }).countByValue foreach println
>
> However the output (println()) on the executors still have the the
> characters of the HDFS file , one character per line.
>
> On Wed, Aug 5, 2015 at 10:30 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> I see the spark job.
>>
>> The println statements has one character per line.
>>
>> 2
>> 0
>> 1
>> 5
>> /
>> 0
>> 8
>> /
>> 0
>> 3
>> /
>> r
>> e
>> g
>> u
>> l
>> a
>> r
>> /
>> p
>> a
>> r
>> t
>> -
>> m
>>
>>
>> ....
>>
>>
>> On Wed, Aug 5, 2015 at 10:27 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> val summary  = rowStructText.map(s => s.split(",")).map(
>>>     {
>>>     s =>
>>> *println(s)*
>>>     Summary(formatStringAsDate(s(0)),
>>>             s(1).replaceAll("\"", "").toLong,
>>>             s(3).replaceAll("\"", "").toLong,
>>>             s(4).replaceAll("\"", "").toInt,
>>>             s(5).replaceAll("\"", ""),
>>>             s(6).replaceAll("\"", "").toInt,
>>>             formatStringAsDate(s(7)),
>>>             formatStringAsDate(s(8)),
>>>             s(9).replaceAll("\"", "").toInt,
>>>             s(10).replaceAll("\"", "").toInt,
>>>             s(11).replaceAll("\"", "").toFloat,
>>>             s(12).replaceAll("\"", "").toInt,
>>>             s(13).replaceAll("\"", "").toInt,
>>>             s(14).replaceAll("\"", "")
>>>         )
>>>     }
>>> )
>>>
>>> summary.count
>>>
>>> AND
>>>
>>> rowStructText.map(s => {
>>> *    println(s)*
>>> s.split(",").size
>>>
>>> }).countByValue foreach println
>>>
>>>
>>> DOES NOT PRINT THE OUTPUT.
>>>
>>> When i open up the spark history server it does not launch new SPARK
>>> JOBS for countByValue . Why is that and when does it actually start a new
>>> job ?
>>>
>>>
>>> On Wed, Aug 5, 2015 at 10:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>> wrote:
>>>
>>>> summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[285] at
>>>> map at <console>:169 (1,517252)
>>>>
>>>> What does that mean ?
>>>>
>>>> On Wed, Aug 5, 2015 at 10:14 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> You data might have format issue (with less fields than you expect)
>>>>>
>>>>> Please try execute the following code to check whether all the lines
>>>>> with 14 fields:
>>>>>        rowStructText.map(s => s.split(",").size).countByValue foreach
>>>>> println
>>>>>
>>>>> On Thu, Aug 6, 2015 at 1:01 PM, Randy Gelhausen <
>>>>> rgelhau...@hortonworks.com> wrote:
>>>>>
>>>>>> You likely have a problem with your parsing logic. I can’t see the
>>>>>> data to know for sure, but since Spark is lazily evaluated, it doesn’t 
>>>>>> try
>>>>>> to run your map until you execute the SQL that applies it to the
>>>>>> data.
>>>>>>
>>>>>> That’s why your first paragraph can run (it’s only defining
>>>>>> metadata), but paragraph 2 throws an error.
>>>>>>
>>>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>>>>> Reply-To: "users@zeppelin.incubator.apache.org"
>>>>>> Date: Thursday, August 6, 2015 at 12:37 AM
>>>>>> To: "users@zeppelin.incubator.apache.org"
>>>>>> Subject: Re: Unable to run count(*)
>>>>>>
>>>>>> %sql
>>>>>> select * from summary
>>>>>>
>>>>>> Throws same error
>>>>>>
>>>>>> On Wed, Aug 5, 2015 at 9:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Para-1
>>>>>>> import java.text.SimpleDateFormat
>>>>>>> import java.util.Calendar
>>>>>>> import java.sql.Date
>>>>>>>
>>>>>>> def formatStringAsDate(dateStr: String) = new java.sql.Date(new
>>>>>>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime())
>>>>>>>
>>>>>>>
>>>>>>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
>>>>>>> val rowStructText =
>>>>>>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz")
>>>>>>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 :
>>>>>>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, 
>>>>>>> f11:
>>>>>>> Float, f12: Integer, f13: Integer, f14: String)
>>>>>>>
>>>>>>> val summary  = rowStructText.map(s => s.split(",")).map(
>>>>>>>     {
>>>>>>>     s =>
>>>>>>>     Summary(formatStringAsDate(s(0)),
>>>>>>>             s(1).replaceAll("\"", "").toLong,
>>>>>>>             s(3).replaceAll("\"", "").toLong,
>>>>>>>             s(4).replaceAll("\"", "").toInt,
>>>>>>>             s(5).replaceAll("\"", ""),
>>>>>>>             s(6).replaceAll("\"", "").toInt,
>>>>>>>             formatStringAsDate(s(7)),
>>>>>>>             formatStringAsDate(s(8)),
>>>>>>>             s(9).replaceAll("\"", "").toInt,
>>>>>>>             s(10).replaceAll("\"", "").toInt,
>>>>>>>             s(11).replaceAll("\"", "").toFloat,
>>>>>>>             s(12).replaceAll("\"", "").toInt,
>>>>>>>             s(13).replaceAll("\"", "").toInt,
>>>>>>>             s(14).replaceAll("\"", "")
>>>>>>>         )
>>>>>>>     }
>>>>>>> ).toDF()
>>>>>>> summary.registerTempTable("summary")
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Output:
>>>>>>> import java.text.SimpleDateFormat import java.util.Calendar import
>>>>>>> java.sql.Date formatStringAsDate: (dateStr: String)java.sql.Date
>>>>>>> rowStructText: org.apache.spark.rdd.RDD[String] =
>>>>>>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz
>>>>>>> MapPartitionsRDD[152] at textFile at <console>:100 defined class Summary
>>>>>>> summary: org.apache.spark.sql.DataFrame = [f1: date, f2: bigint, f3:
>>>>>>> bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9: int, f10:
>>>>>>> int, f11: float, f12: int, f13: int, f14: string]
>>>>>>>
>>>>>>>
>>>>>>> Para-2 (DOES NOT WORK)
>>>>>>> %sql select count(*) from summary
>>>>>>>
>>>>>>> Output
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 0 in stage 29.0 failed 4 times, most recent failure: Lost task 0.3 
>>>>>>> in
>>>>>>> stage 29.0 (TID 1844, datanode-6-3486.phx01.dev.ebayc3.com):
>>>>>>> java.lang.ArrayIndexOutOfBoundsException: 1 at
>>>>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:109)
>>>>>>> at
>>>>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:107)
>>>>>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
>>>>>>> scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at
>>>>>>> scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at
>>>>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:42)
>>>>>>> at
>>>>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:37)
>>>>>>> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>  at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>  at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>  at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>  at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>>
>>>>>>> Suggestions ?
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Reply via email to