Re: Converting timezones in Spark

2017-01-31 Thread Don Drake
So, to follow up on this.

A few lessons learned, when you print a timestamp, it will only show the
date/time in your current timezone, regardless of any conversions you
applied to it.

The trick is to convert it (cast) to a Long, and then the Java8 java.time.*
functions can translate to any timezone and generate a string representing
the timestamp.

Here's a working example:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf

def convertToTZ(col: Long, zone: String, formatter:
DateTimeFormatter):String = {

  val i = Instant.ofEpochSecond(col)
  val z = ZonedDateTime.ofInstant(i, ZoneId.of(zone))

  z.format(formatter)

}

def convertToTZFullTimestamp = udf((col: Long, zone:String) =>
convertToTZ(col, zone, DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss
z")) )

val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-30 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
HH:mm:ss Z").cast("timestamp")).withColumn("EST_tz",
convertToTZFullTimestamp($"created_at".cast("long"),
lit("America/New_York")))

df2.show(4, false)


// Exiting paste mode, now interpreting.

+---+---+-+---+
|id |dts|created_at   |EST_tz |
+---+---+-+---+
|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|2016-09-14 12:46:32 EDT|
|2  |not a timestamp|null |null   |
|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|2016-09-14 12:59:57 EDT|
|4  |2016-11-30 12:00:01 UTC|2016-11-30 06:00:01.0|2016-11-30 07:00:01 EST|
+---+---+-+---+

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf
convertToTZ: (col: Long, zone: String, formatter:
java.time.format.DateTimeFormatter)String
convertToTZFullTimestamp:
org.apache.spark.sql.expressions.UserDefinedFunction
df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]
df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 2 more
fields]

scala>



On Fri, Jan 27, 2017 at 12:01 PM, Don Drake  wrote:

> I'm reading CSV with a timestamp clearly identified in the UTC timezone,
> and I need to store this in a parquet format and eventually read it back
> and convert to different timezones as needed.
>
> Sounds straightforward, but this involves some crazy function calls and
> I'm seeing strange results as I build a test case.
>
> See my example below.  Why are the values for est_ts and cst_ts the same
> in rows 1 and 3 (wrong), but different and correct in row 4?  I have a
> feeling it has to do with daylight savings time, but I'm not sure where to
> resolve it.
>
> Please note that I'm in the Central timezone.
>
> Is there a better method to do this?
>
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1485539128193).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.1.0
>
>   /_/
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_60)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
> scala> :paste
>
> // Entering paste mode (ctrl-D to finish)
>
>
> import org.apache.spark.sql.Column
>
> def stringts_to_tz(col:Column, tz:String) = {
>
> from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
> "-MM-dd HH:mm:ss Z")), "CST"), tz)
>
> }
>
>
> val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"),
> (3L, "2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01
> UTC")).toDF("id", "dts")
>
> val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
> HH:mm:ss Z").cast("timestamp"))
>
> .withColumn("unix_ts", unix_timestamp($"dts", "-MM-dd HH:mm:ss Z"))
>
> .withColumn("local_hour", hour($"created_at"))
>
> .withColumn("s2", from_unixtime($"unix_ts"))
>
> .withColumn("s3", to_utc_timestamp($"s2", "CST"))
>
> .withColumn("s4", from_utc_timestamp($"s3", "EST"))
>
> .withColumn("utc_ts", stringts_to_tz($"dts", "UTC"))
>
> .withColumn("est_ts", stringts_to_tz($"dts", "CST"))
>
> .withColumn("cst_ts", stringts_to_tz($"dts", "EST"))
>
> df2.show(4,false)
>
> df2.printSchema
>
>
>
> // Exiting paste mode, now interpreting.
>
>
> +---+---+-+-
> -+--+---+-+-
> +-+-
> +--

Converting timezones in Spark

2017-01-27 Thread Don Drake
I'm reading CSV with a timestamp clearly identified in the UTC timezone,
and I need to store this in a parquet format and eventually read it back
and convert to different timezones as needed.

Sounds straightforward, but this involves some crazy function calls and I'm
seeing strange results as I build a test case.

See my example below.  Why are the values for est_ts and cst_ts the same in
rows 1 and 3 (wrong), but different and correct in row 4?  I have a feeling
it has to do with daylight savings time, but I'm not sure where to resolve
it.

Please note that I'm in the Central timezone.

Is there a better method to do this?


Spark context available as 'sc' (master = local[*], app id =
local-1485539128193).

Spark session available as 'spark'.

Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0

  /_/


Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_60)

Type in expressions to have them evaluated.

Type :help for more information.


scala> :paste

// Entering paste mode (ctrl-D to finish)


import org.apache.spark.sql.Column

def stringts_to_tz(col:Column, tz:String) = {

from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
"-MM-dd HH:mm:ss Z")), "CST"), tz)

}


val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
HH:mm:ss Z").cast("timestamp"))

.withColumn("unix_ts", unix_timestamp($"dts", "-MM-dd HH:mm:ss Z"))

.withColumn("local_hour", hour($"created_at"))

.withColumn("s2", from_unixtime($"unix_ts"))

.withColumn("s3", to_utc_timestamp($"s2", "CST"))

.withColumn("s4", from_utc_timestamp($"s3", "EST"))

.withColumn("utc_ts", stringts_to_tz($"dts", "UTC"))

.withColumn("est_ts", stringts_to_tz($"dts", "CST"))

.withColumn("cst_ts", stringts_to_tz($"dts", "EST"))

df2.show(4,false)

df2.printSchema



// Exiting paste mode, now interpreting.


+---+---+-+--+--+---+-+-+-+-+-+

|id |dts|created_at   |unix_ts   |local_hour|s2
|s3   |s4   |utc_ts
  |est_ts   |cst_ts   |

+---+---+-+--+--+---+-+-+-+-+-+

|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|1473871592|11
 |2016-09-14 11:46:32|2016-09-14 16:46:32.0|2016-09-14
11:46:32.0|2016-09-14 16:46:32.0|2016-09-14 11:46:32.0|2016-09-14
11:46:32.0|

|2  |not a timestamp|null |null  |null
 |null   |null |null |null
|null |null |

|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|1473872397|11
 |2016-09-14 11:59:57|2016-09-14 16:59:57.0|2016-09-14
11:59:57.0|2016-09-14 16:59:57.0|2016-09-14 11:59:57.0|2016-09-14
11:59:57.0|

|4  |2016-11-31 12:00:01 UTC|2016-12-01 06:00:01.0|1480593601|6
|2016-12-01 06:00:01|2016-12-01 12:00:01.0|2016-12-01 07:00:01.0|2016-12-01
12:00:01.0|2016-12-01 06:00:01.0|2016-12-01 07:00:01.0|

+---+---+-+--+--+---+-+-+-+-+-+


root

 |-- id: long (nullable = false)

 |-- dts: string (nullable = true)

 |-- created_at: timestamp (nullable = true)

 |-- unix_ts: long (nullable = true)

 |-- local_hour: integer (nullable = true)

 |-- s2: string (nullable = true)

 |-- s3: timestamp (nullable = true)

 |-- s4: timestamp (nullable = true)

 |-- utc_ts: timestamp (nullable = true)

 |-- est_ts: timestamp (nullable = true)

 |-- cst_ts: timestamp (nullable = true)


import org.apache.spark.sql.Column

stringts_to_tz: (col: org.apache.spark.sql.Column, tz:
String)org.apache.spark.sql.Column

df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]

df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 9 more
fields]


scala>

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143