So, to follow up on this. A few lessons learned, when you print a timestamp, it will only show the date/time in your current timezone, regardless of any conversions you applied to it.
The trick is to convert it (cast) to a Long, and then the Java8 java.time.* functions can translate to any timezone and generate a string representing the timestamp. Here's a working example: scala> :paste // Entering paste mode (ctrl-D to finish) import java.time.Instant import java.time.ZonedDateTime import java.time.ZoneId import java.time.format.DateTimeFormatter import org.apache.spark.sql.functions.udf def convertToTZ(col: Long, zone: String, formatter: DateTimeFormatter):String = { val i = Instant.ofEpochSecond(col) val z = ZonedDateTime.ofInstant(i, ZoneId.of(zone)) z.format(formatter) } def convertToTZFullTimestamp = udf((col: Long, zone:String) => convertToTZ(col, zone, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss z")) ) val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L, "2016-09-14 16:59:57 UTC"), (4L, "2016-11-30 12:00:01 UTC")).toDF("id", "dts") val df2 = df.withColumn("created_at", unix_timestamp($"dts", "yyyy-MM-dd HH:mm:ss Z").cast("timestamp")).withColumn("EST_tz", convertToTZFullTimestamp($"created_at".cast("long"), lit("America/New_York"))) df2.show(4, false) // Exiting paste mode, now interpreting. +---+-----------------------+---------------------+-----------------------+ |id |dts |created_at |EST_tz | +---+-----------------------+---------------------+-----------------------+ |1 |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|2016-09-14 12:46:32 EDT| |2 |not a timestamp |null |null | |3 |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|2016-09-14 12:59:57 EDT| |4 |2016-11-30 12:00:01 UTC|2016-11-30 06:00:01.0|2016-11-30 07:00:01 EST| +---+-----------------------+---------------------+-----------------------+ import java.time.Instant import java.time.ZonedDateTime import java.time.ZoneId import java.time.format.DateTimeFormatter import org.apache.spark.sql.functions.udf convertToTZ: (col: Long, zone: String, formatter: java.time.format.DateTimeFormatter)String convertToTZFullTimestamp: org.apache.spark.sql.expressions.UserDefinedFunction df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string] df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 2 more fields] scala> On Fri, Jan 27, 2017 at 12:01 PM, Don Drake <dondr...@gmail.com> wrote: > I'm reading CSV with a timestamp clearly identified in the UTC timezone, > and I need to store this in a parquet format and eventually read it back > and convert to different timezones as needed. > > Sounds straightforward, but this involves some crazy function calls and > I'm seeing strange results as I build a test case. > > See my example below. Why are the values for est_ts and cst_ts the same > in rows 1 and 3 (wrong), but different and correct in row 4? I have a > feeling it has to do with daylight savings time, but I'm not sure where to > resolve it. > > Please note that I'm in the Central timezone. > > Is there a better method to do this? > > > Spark context available as 'sc' (master = local[*], app id = > local-1485539128193). > > Spark session available as 'spark'. > > Welcome to > > ____ __ > > / __/__ ___ _____/ /__ > > _\ \/ _ \/ _ `/ __/ '_/ > > /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 > > /_/ > > > Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java > 1.8.0_60) > > Type in expressions to have them evaluated. > > Type :help for more information. > > > scala> :paste > > // Entering paste mode (ctrl-D to finish) > > > import org.apache.spark.sql.Column > > def stringts_to_tz(col:Column, tz:String) = { > > from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col, > "yyyy-MM-dd HH:mm:ss Z")), "CST"), tz) > > } > > > val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), > (3L, "2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01 > UTC")).toDF("id", "dts") > > val df2 = df.withColumn("created_at", unix_timestamp($"dts", "yyyy-MM-dd > HH:mm:ss Z").cast("timestamp")) > > .withColumn("unix_ts", unix_timestamp($"dts", "yyyy-MM-dd HH:mm:ss Z")) > > .withColumn("local_hour", hour($"created_at")) > > .withColumn("s2", from_unixtime($"unix_ts")) > > .withColumn("s3", to_utc_timestamp($"s2", "CST")) > > .withColumn("s4", from_utc_timestamp($"s3", "EST")) > > .withColumn("utc_ts", stringts_to_tz($"dts", "UTC")) > > .withColumn("est_ts", stringts_to_tz($"dts", "CST")) > > .withColumn("cst_ts", stringts_to_tz($"dts", "EST")) > > df2.show(4,false) > > df2.printSchema > > > > // Exiting paste mode, now interpreting. > > > +---+-----------------------+---------------------+--------- > -+----------+-------------------+---------------------+----- > ----------------+---------------------+--------------------- > +---------------------+ > > |id |dts |created_at |unix_ts > |local_hour|s2 |s3 |s4 > |utc_ts |est_ts |cst_ts | > > +---+-----------------------+---------------------+--------- > -+----------+-------------------+---------------------+----- > ----------------+---------------------+--------------------- > +---------------------+ > > |1 |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|1473871592|11 > |2016-09-14 11:46:32|2016-09-14 16:46:32.0|2016-09-14 > 11:46:32.0|2016-09-14 16:46:32.0|2016-09-14 11:46:32.0|2016-09-14 > 11:46:32.0| > > |2 |not a timestamp |null |null |null > |null |null |null |null > |null |null | > > |3 |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|1473872397|11 > |2016-09-14 11:59:57|2016-09-14 16:59:57.0|2016-09-14 > 11:59:57.0|2016-09-14 16:59:57.0|2016-09-14 11:59:57.0|2016-09-14 > 11:59:57.0| > > |4 |2016-11-31 12:00:01 UTC|2016-12-01 06:00:01.0|1480593601|6 > |2016-12-01 06:00:01|2016-12-01 12:00:01.0|2016-12-01 07:00:01.0|2016-12-01 > 12:00:01.0|2016-12-01 06:00:01.0|2016-12-01 07:00:01.0| > > +---+-----------------------+---------------------+--------- > -+----------+-------------------+---------------------+----- > ----------------+---------------------+--------------------- > +---------------------+ > > > root > > |-- id: long (nullable = false) > > |-- dts: string (nullable = true) > > |-- created_at: timestamp (nullable = true) > > |-- unix_ts: long (nullable = true) > > |-- local_hour: integer (nullable = true) > > |-- s2: string (nullable = true) > > |-- s3: timestamp (nullable = true) > > |-- s4: timestamp (nullable = true) > > |-- utc_ts: timestamp (nullable = true) > > |-- est_ts: timestamp (nullable = true) > > |-- cst_ts: timestamp (nullable = true) > > > import org.apache.spark.sql.Column > > stringts_to_tz: (col: org.apache.spark.sql.Column, tz: > String)org.apache.spark.sql.Column > > df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string] > > df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 9 more > fields] > > > scala> > > -- > Donald Drake > Drake Consulting > http://www.drakeconsulting.com/ > https://twitter.com/dondrake <http://www.MailLaunder.com/> > 800-733-2143 <(800)%20733-2143> > -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ https://twitter.com/dondrake <http://www.MailLaunder.com/> 800-733-2143