琴师你好,
你的source ddl里有指定time1为 time attribute吗?
create table source1(
id int,
time1 timestamp,
type string,
WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND
) with (...)
奇怪的不朽琴师 <[email protected]> 于2020年7月10日周五 上午8:43写道:
> ------------------ 原始邮件 ------------------
> 发件人:
> "奇怪的不朽琴师"
> <
> [email protected]>;
> 发送时间: 2020年7月9日(星期四) 下午5:08
> 收件人: "godfrey he"<[email protected]>;
>
> 主题: pyflink1.11.0window
>
>
>
> 你好:
> 我在使用pyflink1.11版本时,window开窗仍会报错
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
>
> 请问这个问题没有修复么?或者是我使用的方式不对,如果是使用不对,能提供一个正确的案例么?
> 代码如下
> 谢谢
>
>
> def from_kafka_to_kafka_demo():
> s_env =
> StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
>
>
> # use blink table planner
> st_env = StreamTableEnvironment.create(s_env)
>
>
> # register source and sink
> register_rides_source(st_env)
> register_rides_sink(st_env)
>
>
> st_env.from_path("source1")\
>
> .window(Tumble.over("1.secends").on("time1").alias("w")) \
> .group_by("w") \
> .select(" id, time1 , time1 ")\
> .insert_into("sink1")
>
> st_env.execute("2-from_kafka_to_kafka")
>
>
>
>
> def register_rides_source(st_env):
> source_ddl = \
> '''
> create table source1(
> id int,
> time1 timestamp,
> type string
> ) with (
> 'connector.type' = 'kafka',
> 'update-mode' = 'append',
> 'connector.topic' = 'tp1',
> 'connector.properties.bootstrap.servers' = 'localhost:9092'
> )
> '''
> st_env.sql_update(source_ddl)
>
>
>
>
> def register_rides_sink(st_env):
> sink_ddl = \
> '''
> create table sink1(
> id int,
> time1 timestamp,
> time2 timestamp
> ) with (
> 'connector.type' = 'kafka',
> 'update-mode' = 'append',
> 'connector.topic' = 'tp3',
> 'connector.properties.bootstrap.servers' = 'localhost:9092'
> )
> '''
> st_env.sql_update(sink_ddl)
>
>
>
>
> if __name__ == '__main__':
> from_kafka_to_kafka_demo()
>
>
>