------------------ ???????? ------------------
??????:
"??????????????"
<[email protected]>;
????????: 2020??7??9??(??????) ????5:08
??????: "godfrey he"<[email protected]>;
????: pyflink1.11.0window
??????
????????pyflink1.11????????window????????????
: org.apache.flink.table.api.ValidationException: A group window expects a time
attribute for grouping in a stream environment.
????????????????????????????????????????????????????????????????????????????????????????
????????
????
def from_kafka_to_kafka_demo():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
# use blink table planner
st_env = StreamTableEnvironment.create(s_env)
# register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
st_env.from_path("source1")\
.window(Tumble.over("1.secends").on("time1").alias("w")) \
.group_by("w") \
.select(" id, time1 , time1 ")\
.insert_into("sink1")
st_env.execute("2-from_kafka_to_kafka")
def register_rides_source(st_env):
source_ddl = \
'''
create table source1(
id int,
time1 timestamp,
type string
) with (
'connector.type' = 'kafka',
'update-mode' = 'append',
'connector.topic' = 'tp1',
'connector.properties.bootstrap.servers' = 'localhost:9092'
)
'''
st_env.sql_update(source_ddl)
def register_rides_sink(st_env):
sink_ddl = \
'''
create table sink1(
id int,
time1 timestamp,
time2 timestamp
) with (
'connector.type' = 'kafka',
'update-mode' = 'append',
'connector.topic' = 'tp3',
'connector.properties.bootstrap.servers' = 'localhost:9092'
)
'''
st_env.sql_update(sink_ddl)
if __name__ == '__main__':
from_kafka_to_kafka_demo()