[
https://issues.apache.org/jira/browse/FLINK-35273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Biao Geng updated FLINK-35273:
--
Description:
The issue is from
https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it
turns out that the output result does not show the expected result.
Here is my test codes:
{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, Configuration
from pyflink.table import DataTypes, StreamTableEnvironment
from datetime import datetime
import pytz
config = Configuration()
config.set_string("python.client.executable",
"/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
config.set_string("python.executable",
"/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
env = StreamExecutionEnvironment.get_execution_environment(config)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set_local_timezone("UTC")
# t_env.get_config().set_local_timezone("GMT-08:00")
input_table = t_env.from_elements(
[
(
"elementA",
datetime(year=2024, month=4, day=12, hour=8, minute=35),
),
(
"elementB",
datetime(year=2024, month=4, day=12, hour=8, minute=35,
tzinfo=pytz.utc),
# datetime(year=2024, month=4, day=12, hour=8, minute=35,
tzinfo=pytz.timezone('America/New_York')),
),
],
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
]
),
)
input_table.execute().print()
# SQL
sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT
TO_TIMESTAMP_LTZ(171291090, 3);")
t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH
('connector'='print');")
t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;")
{code}
The output is:
{code:java}
+++-+
| op | name | timestamp |
+++-+
| +I | elementA | 2024-04-12 08:35:00.000 |
| +I | elementB | 2024-04-12 16:35:00.000 |
+++-+
2 rows in set
+I[2024-04-12T08:35:00Z]
{code}
In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use follow
logic to convert python obj to sql type:
{code:python}
EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
...
def to_sql_type(self, dt):
if dt is not None:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL
{code}
It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not
decided by the timezone set by `set_local_timezone`.
> PyFlink's LocalZonedTimestampType should respect timezone set by
> set_local_timezone
> ---
>
> Key: FLINK-35273
> URL: https://issues.apache.org/jira/browse/FLINK-35273
> Project: Flink
> Issue Type: Bug
> Components: API / Python
>Reporter: Biao Geng
>Priority: Major
>
> The issue is from
> https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
> When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it
> turns out that the output result does not show the expected result.
> Here is my test codes:
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, Configuration
> from pyflink.table import DataTypes, StreamTableEnvironment
> from datetime import datetime
> import pytz
> config = Configuration()
> config.set_string("python.client.executable",
> "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
> config.set_string("python.executable",
> "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().set_local_timezone("UTC")
> # t_env.get_config().set_local_timezone("GMT-08:00")
> input_table = t_env.from_elements(
> [
> (
> "elementA",
> datetime(year=2024, month=4, day=12, hour=8, minute=35),
> ),
> (
> "elementB",
> datetime(year=2024, month=4, day=12, hour=8, minute=35,
> tzinfo=pytz.utc),
> # datetime(year=2024, month=4, day=12, hour=8, minute=35,
> tzinfo=pytz.timezone('America/New_York')),
> ),
> ],
> DataTypes.ROW(
> [
> DataTypes.FIELD("name",