I see, as far as I know Spark CSV datasource does not support custom date format but formal ones such as “2015-08-20 15:57:00”.
Internally this uses Timestamp.valueOf() and Date.valueOf() to parse them. For me, it looks you can 1. modify and build the library by yourself for custom date time (it does not have to change a lot of codes but just here https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala#L62-L64 ). 2. read the files as RDD first, parse the date part to an appropriate format and then make a DataFrame with the library. 2015-12-29 16:41 GMT+09:00 Divya Gehlot <divya.htco...@gmail.com>: > yes I am using spark -csv only > > below is the sample code for your reference > > > 1. 15/12/28 03:34:27 INFO SparkILoop: Created sql context (with Hive > support).. > 2. SQL context available as sqlContext. > 3. > 4. scala> import org.apache.spark.sql.hive.HiveContext > 5. import org.apache.spark.sql.hive.HiveContext > 6. > 7. scala> import org.apache.spark.sql.hive.orc._ > 8. import org.apache.spark.sql.hive.orc._ > 9. > 10. scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > 11. 15/12/28 03:34:57 WARN SparkConf: The configuration key > 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3 > and and may be removed in the future. Please use the new key > 'spark.yarn.am.waitTime' instead. > 12. 15/12/28 03:34:57 INFO HiveContext: Initializing execution hive, > version 0.13.1 > 13. hiveContext: org.apache.spark.sql.hive.HiveContext = > org.apache.spark.sql.hive.HiveContext@3413fbe > 14. > 15. scala> import org.apache.spark.sql.types.{StructType, StructField, > StringType, IntegerType,FloatType ,LongType ,TimestampType,NullType }; > 16. import org.apache.spark.sql.types.{StructType, StructField, > StringType, IntegerType, FloatType, LongType, TimestampType, NullType} > 17. > 18. scala> val loandepoSchema = StructType(Seq( > 19. | StructField("COLUMN1", StringType, true), > 20. | StructField("COLUMN2", StringType , true), > 21. | StructField("COLUMN3", TimestampType , true), > 22. | StructField("COLUMN4", TimestampType , true), > 23. | StructField("COLUMN5", StringType , true), > 24. | StructField("COLUMN6", StringType, true), > 25. | StructField("COLUMN7", IntegerType, true), > 26. | StructField("COLUMN8", IntegerType, true), > 27. | StructField("COLUMN9", StringType, true), > 28. | StructField("COLUMN10", IntegerType, true), > 29. | StructField("COLUMN11", IntegerType, true), > 30. | StructField("COLUMN12", IntegerType, true), > 31. | StructField("COLUMN13", StringType, true), > 32. | StructField("COLUMN14", StringType, true), > 33. | StructField("COLUMN15", StringType, true), > 34. | StructField("COLUMN16", StringType, true), > 35. | StructField("COLUMN17", StringType, true), > 36. | StructField("COLUMN18", StringType, true), > 37. | StructField("COLUMN19", StringType, true), > 38. | StructField("COLUMN20", StringType, true), > 39. | StructField("COLUMN21", StringType, true), > 40. | StructField("COLUMN22", StringType, true))) > 41. loandepoSchema: org.apache.spark.sql.types.StructType = > StructType(StructField(COLUMN1,StringType,true), > StructField(COLUMN2,StringType,true), > StructField(COLUMN3,TimestampType,true), > StructField(COLUMN4,TimestampType,true), > StructField(COLUMN5,StringType,true), StructField(COLUMN6,StringType,true), > StructField(COLUMN7,IntegerType,true), StructField(COLUMN8,IntegerType,true), > StructField(COLUMN9,StringType,true), StructField(COLUMN10,IntegerType,true), > StructField(COLUMN11,IntegerType,true), > StructField(COLUMN12,IntegerType,true), > StructField(COLUMN13,StringType,true), StructField(COLUMN14,StringType,true), > StructField(COLUMN15,StringType,true), StructField(COLUMN16,StringType,true), > StructField(COLUMN17,StringType,true), StructField(COLUMN18,StringType,true), > StructField(COLUMN19,Strin... > 42. scala> val lonadepodf = > hiveContext.read.format("com.databricks.spark.csv").option("header", > "true").schema(loandepoSchema).load("/tmp/TestDivya/loandepo_10K.csv") > 43. 15/12/28 03:37:52 INFO HiveContext: Initializing > HiveMetastoreConnection version 0.13.1 using Spark classes. > 44. lonadepodf: org.apache.spark.sql.DataFrame = [COLUMN1: string, > COLUMN2: string, COLUMN3: timestamp, COLUMN4: timestamp, COLUMN5: string, > COLUMN6: string, COLUMN7: int, COLUMN8: int, COLUMN9: string, COLUMN10: int, > COLUMN11: int, COLUMN12: int, COLUMN13: string, COLUMN14: string, COLUMN15: > string, COLUMN16: string, COLUMN17: string, COLUMN18: string, COLUMN19: > string, COLUMN20: string, COLUMN21: string, COLUMN22: string] > 45. > 46. scala> lonadepodf.select("COLUMN1").show(10) > 47. 15/12/28 03:38:01 INFO MemoryStore: ensureFreeSpace(216384) called > with curMem=0, maxMem=278302556 > 48. 15/12/28 03:38:01 INFO MemoryStore: Block broadcast_0 stored as values > in memory (estimated size 211.3 KB, free 265.2 MB) > 49. > ............................................................................... > 50. 15/12/28 03:38:07 INFO DAGScheduler: ResultStage 2 (show at > <console>:33) finished in 0.653 s > 51. 15/12/28 03:38:07 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks > have all completed, from pool > 52. 15/12/28 03:38:07 INFO DAGScheduler: Job 2 finished: show at > <console>:33, took 0.669388 s > 53. +-------+ > 54. |COLUMN1| > 55. +-------+ > 56. +-------+ > > Once Timestamp StructField is removed . Result set is returned > > > 1. scala> val loandepoSchema = StructType(Seq( > 2. | StructField("COLUMN1", StringType, true), > 3. | StructField("COLUMN2", StringType , true), > 4. | StructField("COLUMN3", StringType , true), > 5. | StructField("COLUMN4", StringType , true), > 6. | StructField("COLUMN5", StringType , true), > 7. | StructField("COLUMN6", StringType, true), > 8. | StructField("COLUMN7", IntegerType, true), > 9. | StructField("COLUMN8", IntegerType, true), > 10. | StructField("COLUMN9", StringType, true), > 11. | StructField("COLUMN10", IntegerType, true), > 12. | StructField("COLUMN11", IntegerType, true), > 13. | StructField("COLUMN12", IntegerType, true), > 14. | StructField("COLUMN13", StringType, true), > 15. | StructField("COLUMN14", StringType, true), > 16. | StructField("COLUMN15", StringType, true), > 17. | StructField("COLUMN16", StringType, true), > 18. | StructField("COLUMN17", StringType, true), > 19. | StructField("COLUMN18", StringType, true), > 20. | StructField("COLUMN19", StringType, true), > 21. | StructField("COLUMN20", StringType, true), > 22. | StructField("COLUMN21", StringType, true), > 23. | StructField("COLUMN22", StringType, true))) > 24. loandepoSchema: org.apache.spark.sql.types.StructType = > StructType(StructField(COLUMN1,StringType,true), > StructField(COLUMN2,StringType,true), StructField(COLUMN3,StringType,true), > StructField(COLUMN4,StringType,true), StructField(COLUMN5,StringType,true), > StructField(COLUMN6,StringType,true), StructField(COLUMN7,IntegerType,true), > StructField(COLUMN8,IntegerType,true), StructField(COLUMN9,StringType,true), > StructField(COLUMN10,IntegerType,true), > StructField(COLUMN11,IntegerType,true), > StructField(COLUMN12,IntegerType,true), > StructField(COLUMN13,StringType,true), StructField(COLUMN14,StringType,true), > StructField(COLUMN15,StringType,true), StructField(COLUMN16,StringType,true), > StructField(COLUMN17,StringType,true), StructField(COLUMN18,StringType,true), > StructField(COLUMN19,StringType,... > 25. scala> val lonadepodf = > hiveContext.read.format("com.databricks.spark.csv").option("header", > "true").schema(loandepoSchema).load("/tmp/TestDivya/loandepo_10K.csv") > 26. lonadepodf: org.apache.spark.sql.DataFrame = [COLUMN1: string, > COLUMN2: string, COLUMN3: string, COLUMN4: string, COLUMN5: string, COLUMN6: > string, COLUMN7: int, COLUMN8: int, COLUMN9: string, COLUMN10: int, COLUMN11: > int, COLUMN12: int, COLUMN13: string, COLUMN14: string, COLUMN15: string, > COLUMN16: string, COLUMN17: string, COLUMN18: string, COLUMN19: string, > COLUMN20: string, COLUMN21: string, COLUMN22: string] > 27. > 28. scala> lonadepodf.select("COLUMN1").show(10) > 29. 15/12/28 03:39:48 INFO BlockManagerInfo: Removed broadcast_8_piece0 on > 172.31.20.85:40013 in memory (size: 4.2 KB, free: 265.3 MB) > 30. > 31. 15/12/28 03:39:49 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks > have all completed, from pool > 32. 15/12/28 03:39:49 INFO DAGScheduler: Job 6 finished: show at > <console>:33, took 0.223277 s > 33. +-------+ > 34. |COLUMN1| > 35. +-------+ > 36. | CTR0| > 37. | CTR1| > 38. | CTR2| > 39. | CTR3| > 40. | CTR4| > 41. | CTR5| > 42. | CTR6| > 43. | CTR7| > 44. | CTR8| > 45. | CTR9| > 46. +-------+ > > Would really appreciate if you could help me in create custom schema for > date column for my CSV file > Following is my CSV file column data > 5/1/2012 5/2/2015 5/1/2012 10/1/2015 5/3/2014 5/2/2013 5/2/2014 1/1/2013 > 10/2/2011 5/4/2013 > > > > > > > On 29 December 2015 at 15:00, Hyukjin Kwon <gurwls...@gmail.com> wrote: > >> Hi Divya, >> >> Are you using or have you tried Spark CSV datasource >> https://github.com/databricks/spark-csv ? >> >> Thanks! >> >> >> 2015-12-28 18:42 GMT+09:00 Divya Gehlot <divya.htco...@gmail.com>: >> >>> Hi, >>> I have input data set which is CSV file where I have date columns. >>> My output will also be CSV file and will using this output CSV file as >>> for hive table creation. >>> I have few queries : >>> 1.I tried using custom schema using Timestamp but it is returning empty >>> result set when querying the dataframes. >>> 2.Can I use String datatype in Spark for date column and while creating >>> table can define it as date type ? Partitioning of my hive table will be >>> date column. >>> >>> Would really appreciate if you share some sample code for timestamp in >>> Dataframe whereas same can be used while creating the hive table. >>> >>> >>> >>> Thanks, >>> Divya >>> >> >> > 2015-12-29 16:41 GMT+09:00 Divya Gehlot <divya.htco...@gmail.com>: > yes I am using spark -csv only > > below is the sample code for your reference > > > 1. 15/12/28 03:34:27 INFO SparkILoop: Created sql context (with Hive > support).. > 2. SQL context available as sqlContext. > 3. > 4. scala> import org.apache.spark.sql.hive.HiveContext > 5. import org.apache.spark.sql.hive.HiveContext > 6. > 7. scala> import org.apache.spark.sql.hive.orc._ > 8. import org.apache.spark.sql.hive.orc._ > 9. > 10. scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > 11. 15/12/28 03:34:57 WARN SparkConf: The configuration key > 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3 > and and may be removed in the future. Please use the new key > 'spark.yarn.am.waitTime' instead. > 12. 15/12/28 03:34:57 INFO HiveContext: Initializing execution hive, > version 0.13.1 > 13. hiveContext: org.apache.spark.sql.hive.HiveContext = > org.apache.spark.sql.hive.HiveContext@3413fbe > 14. > 15. scala> import org.apache.spark.sql.types.{StructType, StructField, > StringType, IntegerType,FloatType ,LongType ,TimestampType,NullType }; > 16. import org.apache.spark.sql.types.{StructType, StructField, > StringType, IntegerType, FloatType, LongType, TimestampType, NullType} > 17. > 18. scala> val loandepoSchema = StructType(Seq( > 19. | StructField("COLUMN1", StringType, true), > 20. | StructField("COLUMN2", StringType , true), > 21. | StructField("COLUMN3", TimestampType , true), > 22. | StructField("COLUMN4", TimestampType , true), > 23. | StructField("COLUMN5", StringType , true), > 24. | StructField("COLUMN6", StringType, true), > 25. | StructField("COLUMN7", IntegerType, true), > 26. | StructField("COLUMN8", IntegerType, true), > 27. | StructField("COLUMN9", StringType, true), > 28. | StructField("COLUMN10", IntegerType, true), > 29. | StructField("COLUMN11", IntegerType, true), > 30. | StructField("COLUMN12", IntegerType, true), > 31. | StructField("COLUMN13", StringType, true), > 32. | StructField("COLUMN14", StringType, true), > 33. | StructField("COLUMN15", StringType, true), > 34. | StructField("COLUMN16", StringType, true), > 35. | StructField("COLUMN17", StringType, true), > 36. | StructField("COLUMN18", StringType, true), > 37. | StructField("COLUMN19", StringType, true), > 38. | StructField("COLUMN20", StringType, true), > 39. | StructField("COLUMN21", StringType, true), > 40. | StructField("COLUMN22", StringType, true))) > 41. loandepoSchema: org.apache.spark.sql.types.StructType = > StructType(StructField(COLUMN1,StringType,true), > StructField(COLUMN2,StringType,true), > StructField(COLUMN3,TimestampType,true), > StructField(COLUMN4,TimestampType,true), > StructField(COLUMN5,StringType,true), StructField(COLUMN6,StringType,true), > StructField(COLUMN7,IntegerType,true), StructField(COLUMN8,IntegerType,true), > StructField(COLUMN9,StringType,true), StructField(COLUMN10,IntegerType,true), > StructField(COLUMN11,IntegerType,true), > StructField(COLUMN12,IntegerType,true), > StructField(COLUMN13,StringType,true), StructField(COLUMN14,StringType,true), > StructField(COLUMN15,StringType,true), StructField(COLUMN16,StringType,true), > StructField(COLUMN17,StringType,true), StructField(COLUMN18,StringType,true), > StructField(COLUMN19,Strin... > 42. scala> val lonadepodf = > hiveContext.read.format("com.databricks.spark.csv").option("header", > "true").schema(loandepoSchema).load("/tmp/TestDivya/loandepo_10K.csv") > 43. 15/12/28 03:37:52 INFO HiveContext: Initializing > HiveMetastoreConnection version 0.13.1 using Spark classes. > 44. lonadepodf: org.apache.spark.sql.DataFrame = [COLUMN1: string, > COLUMN2: string, COLUMN3: timestamp, COLUMN4: timestamp, COLUMN5: string, > COLUMN6: string, COLUMN7: int, COLUMN8: int, COLUMN9: string, COLUMN10: int, > COLUMN11: int, COLUMN12: int, COLUMN13: string, COLUMN14: string, COLUMN15: > string, COLUMN16: string, COLUMN17: string, COLUMN18: string, COLUMN19: > string, COLUMN20: string, COLUMN21: string, COLUMN22: string] > 45. > 46. scala> lonadepodf.select("COLUMN1").show(10) > 47. 15/12/28 03:38:01 INFO MemoryStore: ensureFreeSpace(216384) called > with curMem=0, maxMem=278302556 > 48. 15/12/28 03:38:01 INFO MemoryStore: Block broadcast_0 stored as values > in memory (estimated size 211.3 KB, free 265.2 MB) > 49. > ............................................................................... > 50. 15/12/28 03:38:07 INFO DAGScheduler: ResultStage 2 (show at > <console>:33) finished in 0.653 s > 51. 15/12/28 03:38:07 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks > have all completed, from pool > 52. 15/12/28 03:38:07 INFO DAGScheduler: Job 2 finished: show at > <console>:33, took 0.669388 s > 53. +-------+ > 54. |COLUMN1| > 55. +-------+ > 56. +-------+ > > Once Timestamp StructField is removed . Result set is returned > > > 1. scala> val loandepoSchema = StructType(Seq( > 2. | StructField("COLUMN1", StringType, true), > 3. | StructField("COLUMN2", StringType , true), > 4. | StructField("COLUMN3", StringType , true), > 5. | StructField("COLUMN4", StringType , true), > 6. | StructField("COLUMN5", StringType , true), > 7. | StructField("COLUMN6", StringType, true), > 8. | StructField("COLUMN7", IntegerType, true), > 9. | StructField("COLUMN8", IntegerType, true), > 10. | StructField("COLUMN9", StringType, true), > 11. | StructField("COLUMN10", IntegerType, true), > 12. | StructField("COLUMN11", IntegerType, true), > 13. | StructField("COLUMN12", IntegerType, true), > 14. | StructField("COLUMN13", StringType, true), > 15. | StructField("COLUMN14", StringType, true), > 16. | StructField("COLUMN15", StringType, true), > 17. | StructField("COLUMN16", StringType, true), > 18. | StructField("COLUMN17", StringType, true), > 19. | StructField("COLUMN18", StringType, true), > 20. | StructField("COLUMN19", StringType, true), > 21. | StructField("COLUMN20", StringType, true), > 22. | StructField("COLUMN21", StringType, true), > 23. | StructField("COLUMN22", StringType, true))) > 24. loandepoSchema: org.apache.spark.sql.types.StructType = > StructType(StructField(COLUMN1,StringType,true), > StructField(COLUMN2,StringType,true), StructField(COLUMN3,StringType,true), > StructField(COLUMN4,StringType,true), StructField(COLUMN5,StringType,true), > StructField(COLUMN6,StringType,true), StructField(COLUMN7,IntegerType,true), > StructField(COLUMN8,IntegerType,true), StructField(COLUMN9,StringType,true), > StructField(COLUMN10,IntegerType,true), > StructField(COLUMN11,IntegerType,true), > StructField(COLUMN12,IntegerType,true), > StructField(COLUMN13,StringType,true), StructField(COLUMN14,StringType,true), > StructField(COLUMN15,StringType,true), StructField(COLUMN16,StringType,true), > StructField(COLUMN17,StringType,true), StructField(COLUMN18,StringType,true), > StructField(COLUMN19,StringType,... > 25. scala> val lonadepodf = > hiveContext.read.format("com.databricks.spark.csv").option("header", > "true").schema(loandepoSchema).load("/tmp/TestDivya/loandepo_10K.csv") > 26. lonadepodf: org.apache.spark.sql.DataFrame = [COLUMN1: string, > COLUMN2: string, COLUMN3: string, COLUMN4: string, COLUMN5: string, COLUMN6: > string, COLUMN7: int, COLUMN8: int, COLUMN9: string, COLUMN10: int, COLUMN11: > int, COLUMN12: int, COLUMN13: string, COLUMN14: string, COLUMN15: string, > COLUMN16: string, COLUMN17: string, COLUMN18: string, COLUMN19: string, > COLUMN20: string, COLUMN21: string, COLUMN22: string] > 27. > 28. scala> lonadepodf.select("COLUMN1").show(10) > 29. 15/12/28 03:39:48 INFO BlockManagerInfo: Removed broadcast_8_piece0 on > 172.31.20.85:40013 in memory (size: 4.2 KB, free: 265.3 MB) > 30. > 31. 15/12/28 03:39:49 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks > have all completed, from pool > 32. 15/12/28 03:39:49 INFO DAGScheduler: Job 6 finished: show at > <console>:33, took 0.223277 s > 33. +-------+ > 34. |COLUMN1| > 35. +-------+ > 36. | CTR0| > 37. | CTR1| > 38. | CTR2| > 39. | CTR3| > 40. | CTR4| > 41. | CTR5| > 42. | CTR6| > 43. | CTR7| > 44. | CTR8| > 45. | CTR9| > 46. +-------+ > > Would really appreciate if you could help me in create custom schema for > date column for my CSV file > Following is my CSV file column data > 5/1/2012 5/2/2015 5/1/2012 10/1/2015 5/3/2014 5/2/2013 5/2/2014 1/1/2013 > 10/2/2011 5/4/2013 > > > > > > > On 29 December 2015 at 15:00, Hyukjin Kwon <gurwls...@gmail.com> wrote: > >> Hi Divya, >> >> Are you using or have you tried Spark CSV datasource >> https://github.com/databricks/spark-csv ? >> >> Thanks! >> >> >> 2015-12-28 18:42 GMT+09:00 Divya Gehlot <divya.htco...@gmail.com>: >> >>> Hi, >>> I have input data set which is CSV file where I have date columns. >>> My output will also be CSV file and will using this output CSV file as >>> for hive table creation. >>> I have few queries : >>> 1.I tried using custom schema using Timestamp but it is returning empty >>> result set when querying the dataframes. >>> 2.Can I use String datatype in Spark for date column and while creating >>> table can define it as date type ? Partitioning of my hive table will be >>> date column. >>> >>> Would really appreciate if you share some sample code for timestamp in >>> Dataframe whereas same can be used while creating the hive table. >>> >>> >>> >>> Thanks, >>> Divya >>> >> >> >