------------------ ???????? ------------------
??????:                                                                         
                                               "??????????????"                 
                                                                   
<[email protected]&gt;;
????????:&nbsp;2020??7??9??(??????) ????5:08
??????:&nbsp;"godfrey he"<[email protected]&gt;;

????:&nbsp;pyflink1.11.0window



??????
&nbsp; &nbsp;????????pyflink1.11????????window????????????
: org.apache.flink.table.api.ValidationException: A group window expects a time 
attribute for grouping in a stream environment.

????????????????????????????????????????????????????????????????????????????????????????
????????
????


def from_kafka_to_kafka_demo():
&nbsp; &nbsp; s_env = StreamExecutionEnvironment.get_execution_environment()
&nbsp; &nbsp; s_env.set_parallelism(1)


&nbsp; &nbsp; # use blink table planner
&nbsp; &nbsp; st_env = StreamTableEnvironment.create(s_env)


&nbsp; &nbsp; # register source and sink
&nbsp; &nbsp; register_rides_source(st_env)
&nbsp; &nbsp; register_rides_sink(st_env)


&nbsp; &nbsp; st_env.from_path("source1")\
&nbsp; &nbsp; &nbsp; &nbsp; 
.window(Tumble.over("1.secends").on("time1").alias("w")) \
&nbsp; &nbsp; &nbsp; &nbsp; .group_by("w") \
&nbsp; &nbsp; &nbsp; &nbsp; .select(" id,&nbsp; time1 , time1 ")\
&nbsp; &nbsp; &nbsp; &nbsp; .insert_into("sink1")
&nbsp; &nbsp; 
&nbsp; &nbsp; st_env.execute("2-from_kafka_to_kafka")




def register_rides_source(st_env):
&nbsp; &nbsp; source_ddl = \
&nbsp; &nbsp; '''
&nbsp; &nbsp; create table source1(
&nbsp; &nbsp; &nbsp; &nbsp; id int,
&nbsp; &nbsp; &nbsp;time1 timestamp,
&nbsp; &nbsp; &nbsp;type string
&nbsp; &nbsp; &nbsp;) with (
&nbsp; &nbsp; 'connector.type' = 'kafka',
&nbsp; &nbsp; 'update-mode' = 'append',
&nbsp; &nbsp; 'connector.topic' = 'tp1',
&nbsp; &nbsp; 'connector.properties.bootstrap.servers' = 'localhost:9092'
&nbsp; &nbsp; &nbsp;)
&nbsp; &nbsp; '''
&nbsp; &nbsp; st_env.sql_update(source_ddl)




def register_rides_sink(st_env):
&nbsp; &nbsp; sink_ddl = \
&nbsp; &nbsp; '''
&nbsp; &nbsp; create table sink1(
&nbsp; &nbsp; &nbsp; &nbsp; id int,
&nbsp; &nbsp; &nbsp;time1 timestamp,
&nbsp; &nbsp; &nbsp;time2 timestamp
&nbsp; &nbsp; &nbsp;) with (
&nbsp; &nbsp; 'connector.type' = 'kafka',
&nbsp; &nbsp; 'update-mode' = 'append',
&nbsp; &nbsp; 'connector.topic' = 'tp3',
&nbsp; &nbsp; 'connector.properties.bootstrap.servers' = 'localhost:9092'
&nbsp; &nbsp; &nbsp;)
&nbsp; &nbsp; '''
&nbsp; &nbsp; st_env.sql_update(sink_ddl)




if __name__ == '__main__':
&nbsp; &nbsp; from_kafka_to_kafka_demo()


&nbsp; &nbsp;

回复