琴师你好,

你的source ddl里有指定time1为 time attribute吗?
create table source1(
        id int,
        time1 timestamp,
        type string,
        WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND
) with (...)

奇怪的不朽琴师 <[email protected]> 于2020年7月10日周五 上午8:43写道:

> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "奇怪的不朽琴师"
>                                                                     <
> [email protected]&gt;;
> 发送时间:&nbsp;2020年7月9日(星期四) 下午5:08
> 收件人:&nbsp;"godfrey he"<[email protected]&gt;;
>
> 主题:&nbsp;pyflink1.11.0window
>
>
>
> 你好:
> &nbsp; &nbsp;我在使用pyflink1.11版本时,window开窗仍会报错
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
>
> 请问这个问题没有修复么?或者是我使用的方式不对,如果是使用不对,能提供一个正确的案例么?
> 代码如下
> 谢谢
>
>
> def from_kafka_to_kafka_demo():
> &nbsp; &nbsp; s_env =
> StreamExecutionEnvironment.get_execution_environment()
> &nbsp; &nbsp; s_env.set_parallelism(1)
>
>
> &nbsp; &nbsp; # use blink table planner
> &nbsp; &nbsp; st_env = StreamTableEnvironment.create(s_env)
>
>
> &nbsp; &nbsp; # register source and sink
> &nbsp; &nbsp; register_rides_source(st_env)
> &nbsp; &nbsp; register_rides_sink(st_env)
>
>
> &nbsp; &nbsp; st_env.from_path("source1")\
> &nbsp; &nbsp; &nbsp; &nbsp;
> .window(Tumble.over("1.secends").on("time1").alias("w")) \
> &nbsp; &nbsp; &nbsp; &nbsp; .group_by("w") \
> &nbsp; &nbsp; &nbsp; &nbsp; .select(" id,&nbsp; time1 , time1 ")\
> &nbsp; &nbsp; &nbsp; &nbsp; .insert_into("sink1")
> &nbsp; &nbsp;
> &nbsp; &nbsp; st_env.execute("2-from_kafka_to_kafka")
>
>
>
>
> def register_rides_source(st_env):
> &nbsp; &nbsp; source_ddl = \
> &nbsp; &nbsp; '''
> &nbsp; &nbsp; create table source1(
> &nbsp; &nbsp; &nbsp; &nbsp; id int,
> &nbsp; &nbsp; &nbsp;time1 timestamp,
> &nbsp; &nbsp; &nbsp;type string
> &nbsp; &nbsp; &nbsp;) with (
> &nbsp; &nbsp; 'connector.type' = 'kafka',
> &nbsp; &nbsp; 'update-mode' = 'append',
> &nbsp; &nbsp; 'connector.topic' = 'tp1',
> &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = 'localhost:9092'
> &nbsp; &nbsp; &nbsp;)
> &nbsp; &nbsp; '''
> &nbsp; &nbsp; st_env.sql_update(source_ddl)
>
>
>
>
> def register_rides_sink(st_env):
> &nbsp; &nbsp; sink_ddl = \
> &nbsp; &nbsp; '''
> &nbsp; &nbsp; create table sink1(
> &nbsp; &nbsp; &nbsp; &nbsp; id int,
> &nbsp; &nbsp; &nbsp;time1 timestamp,
> &nbsp; &nbsp; &nbsp;time2 timestamp
> &nbsp; &nbsp; &nbsp;) with (
> &nbsp; &nbsp; 'connector.type' = 'kafka',
> &nbsp; &nbsp; 'update-mode' = 'append',
> &nbsp; &nbsp; 'connector.topic' = 'tp3',
> &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = 'localhost:9092'
> &nbsp; &nbsp; &nbsp;)
> &nbsp; &nbsp; '''
> &nbsp; &nbsp; st_env.sql_update(sink_ddl)
>
>
>
>
> if __name__ == '__main__':
> &nbsp; &nbsp; from_kafka_to_kafka_demo()
>
>
> &nbsp; &nbsp;

回复