I'm reading CSV with a timestamp clearly identified in the UTC timezone,
and I need to store this in a parquet format and eventually read it back
and convert to different timezones as needed.

Sounds straightforward, but this involves some crazy function calls and I'm
seeing strange results as I build a test case.

See my example below.  Why are the values for est_ts and cst_ts the same in
rows 1 and 3 (wrong), but different and correct in row 4?  I have a feeling
it has to do with daylight savings time, but I'm not sure where to resolve
it.

Please note that I'm in the Central timezone.

Is there a better method to do this?


Spark context available as 'sc' (master = local[*], app id =
local-1485539128193).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0

      /_/


Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_60)

Type in expressions to have them evaluated.

Type :help for more information.


scala> :paste

// Entering paste mode (ctrl-D to finish)


import org.apache.spark.sql.Column

def stringts_to_tz(col:Column, tz:String) = {

    from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
"yyyy-MM-dd HH:mm:ss Z")), "CST"), tz)

}


val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "yyyy-MM-dd
HH:mm:ss Z").cast("timestamp"))

    .withColumn("unix_ts", unix_timestamp($"dts", "yyyy-MM-dd HH:mm:ss Z"))

    .withColumn("local_hour", hour($"created_at"))

    .withColumn("s2", from_unixtime($"unix_ts"))

    .withColumn("s3", to_utc_timestamp($"s2", "CST"))

    .withColumn("s4", from_utc_timestamp($"s3", "EST"))

    .withColumn("utc_ts", stringts_to_tz($"dts", "UTC"))

    .withColumn("est_ts", stringts_to_tz($"dts", "CST"))

    .withColumn("cst_ts", stringts_to_tz($"dts", "EST"))

df2.show(4,false)

df2.printSchema



// Exiting paste mode, now interpreting.


+---+-----------------------+---------------------+----------+----------+-------------------+---------------------+---------------------+---------------------+---------------------+---------------------+

|id |dts                    |created_at           |unix_ts   |local_hour|s2
                |s3                   |s4                   |utc_ts
      |est_ts               |cst_ts               |

+---+-----------------------+---------------------+----------+----------+-------------------+---------------------+---------------------+---------------------+---------------------+---------------------+

|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|1473871592|11
 |2016-09-14 11:46:32|2016-09-14 16:46:32.0|2016-09-14
11:46:32.0|2016-09-14 16:46:32.0|2016-09-14 11:46:32.0|2016-09-14
11:46:32.0|

|2  |not a timestamp        |null                 |null      |null
 |null               |null                 |null                 |null
            |null                 |null                 |

|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|1473872397|11
 |2016-09-14 11:59:57|2016-09-14 16:59:57.0|2016-09-14
11:59:57.0|2016-09-14 16:59:57.0|2016-09-14 11:59:57.0|2016-09-14
11:59:57.0|

|4  |2016-11-31 12:00:01 UTC|2016-12-01 06:00:01.0|1480593601|6
|2016-12-01 06:00:01|2016-12-01 12:00:01.0|2016-12-01 07:00:01.0|2016-12-01
12:00:01.0|2016-12-01 06:00:01.0|2016-12-01 07:00:01.0|

+---+-----------------------+---------------------+----------+----------+-------------------+---------------------+---------------------+---------------------+---------------------+---------------------+


root

 |-- id: long (nullable = false)

 |-- dts: string (nullable = true)

 |-- created_at: timestamp (nullable = true)

 |-- unix_ts: long (nullable = true)

 |-- local_hour: integer (nullable = true)

 |-- s2: string (nullable = true)

 |-- s3: timestamp (nullable = true)

 |-- s4: timestamp (nullable = true)

 |-- utc_ts: timestamp (nullable = true)

 |-- est_ts: timestamp (nullable = true)

 |-- cst_ts: timestamp (nullable = true)


import org.apache.spark.sql.Column

stringts_to_tz: (col: org.apache.spark.sql.Column, tz:
String)org.apache.spark.sql.Column

df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]

df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 9 more
fields]


scala>

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143

Reply via email to