how do i persist the RDD to HDFS ? On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver <philip.wea...@gmail.com> wrote:
> This message means that java.util.Date is not supported by Spark > DataFrame. You'll need to use java.sql.Date, I believe. > > On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > >> That seem to be working. however i see a new exception >> >> Code: >> def formatStringAsDate(dateStr: String) = new >> SimpleDateFormat("yyyy-MM-dd").parse(dateStr) >> >> >> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) >> val rowStructText = >> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz") >> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : >> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: >> Float, f12: Integer, f13: Integer, f14: String) >> >> val summary = rowStructText.map(s => s.split(",")).map( >> s => Summary(formatStringAsDate(s(0)), >> s(1).replaceAll("\"", "").toLong, >> s(3).replaceAll("\"", "").toLong, >> s(4).replaceAll("\"", "").toInt, >> s(5).replaceAll("\"", ""), >> s(6).replaceAll("\"", "").toInt, >> formatStringAsDate(s(7)), >> formatStringAsDate(s(8)), >> s(9).replaceAll("\"", "").toInt, >> s(10).replaceAll("\"", "").toInt, >> s(11).replaceAll("\"", "").toFloat, >> s(12).replaceAll("\"", "").toInt, >> s(13).replaceAll("\"", "").toInt, >> s(14).replaceAll("\"", "") >> ) >> ).toDF() >> bank.registerTempTable("summary") >> >> >> //Output >> import java.text.SimpleDateFormat import java.util.Calendar import >> java.util.Date formatStringAsDate: (dateStr: String)java.util.Date >> rowStructText: org.apache.spark.rdd.RDD[String] = >> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz >> MapPartitionsRDD[105] at textFile at <console>:60 defined class Summary x: >> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at >> <console>:61 java.lang.UnsupportedOperationException: Schema for type >> java.util.Date is not supported at >> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) >> at >> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) >> at >> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) >> >> >> Any suggestions >> >> On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver <philip.wea...@gmail.com> >> wrote: >> >>> The parallelize method does not read the contents of a file. It simply >>> takes a collection and distributes it to the cluster. In this case, the >>> String is a collection 67 characters. >>> >>> Use sc.textFile instead of sc.parallelize, and it should work as you >>> want. >>> >>> On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> I have csv data that is embedded in gzip format on HDFS. >>>> >>>> *With Pig* >>>> >>>> a = load >>>> '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz' using >>>> PigStorage(); >>>> >>>> b = limit a 10 >>>> >>>> >>>> (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,,,,,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) >>>> >>>> >>>> (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) >>>> >>>> >>>> However with Spark >>>> >>>> val rowStructText = >>>> sc.parallelize("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00000.gz") >>>> >>>> val x = rowStructText.map(s => { >>>> >>>> println(s) >>>> >>>> s} >>>> >>>> ) >>>> >>>> x.count >>>> >>>> Questions >>>> >>>> 1) x.count always shows 67 irrespective of the path i change in >>>> sc.parallelize >>>> >>>> 2) It shows x as RDD[Char] instead of String >>>> >>>> 3) println() never emits the rows. >>>> >>>> Any suggestions >>>> >>>> -Deepak >>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >> >> >> -- >> Deepak >> >> > -- Deepak