大家好:
我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
我的 source语句为:
CREATE TABLE random_source (
amount int,
course_code string,
`event_time` TIMESTAMP(3) ,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ding_broadcast_vip_order_test',
'properties.bootstrap.servers' = '192.168.100.135:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
sink为:
CREATE TABLE print_sink (
amount BIGINT,
course_code string,
`event_time_start` TIMESTAMP(3),
`event_time_end` TIMESTAMP(3)
) WITH (
'connector' = 'print'
)
而我尝试过如下两种方式聚合,可是都遇到同样的问题:
insert into print_sink select
count(amount) as amount
,course_code
,tumble_start(event_time, interval '1' minute) as
event_time_start
,tumble_end(event_time, interval '1' minute) as
event_time_end
from random_source
group by tumble(event_time, interval '1' minute),course_code
以及:
insert into print_sink select
sum(amount),course_code,window_start,window_end
from table(
tumble(
table random_source,
descriptor(event_time),
interval '1' minutes
)
) group by window_start, window_end,,course_code
我的kafka数据结构为
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:44:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:45:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:46:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:47:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:48:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:49:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:50:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:51:40’}
在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
* from random_source时,sink又能够正常输出,
我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
非常感谢各位
from pyflink.table import EnvironmentSettings, StreamTableEnvironment,
TableEnvironment
def hello_world():
"""
从随机Source读取数据,然后直接利用PrintSink输出。
"""
settings = EnvironmentSettings.in_streaming_mode()
# env = StreamExecutionEnvironment.get_execution_environment()
# t_env = StreamTableEnvironment.create(s_env)
t_env = TableEnvironment.create(settings)
t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar")
source_ddl = """
CREATE TABLE random_source (
amount int,
course_code string,
`event_time` TIMESTAMP(3) ,
WATERMARK FOR event_time AS event_time - INTERVAL
'5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ding_broadcast_vip_order_test',
'properties.bootstrap.servers' =
'192.168.100.135:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
# f_sequence_ as PROCTIME() # METADATA FROM
'timestamp'
# 注册source
t_env.execute_sql(source_ddl)
# 数据提取
"""
sink_ddl = """
CREATE TABLE print_sink (
amount BIGINT,
course_code string,
`event_time_start` TIMESTAMP(3),
`event_time_end` TIMESTAMP(3)
) WITH (
'connector' = 'print'
)
"""
# 注册sink
t_env.execute_sql(sink_ddl)
t_env.execute_sql('''insert into print_sink select
count(amount) as amount
,course_code
,tumble_start(event_time, interval '1' minute) as
event_time_start
,tumble_end(event_time, interval '1' minute) as
event_time_end
from random_source
group by tumble(event_time, interval '1'
minute),course_code
''').wait()
# tab.insert_into('print_sink')
# tab.execute_insert('print_sink', overwrite=False).wait()
# 执行作业
# t_env.execute("Flink Hello World")
if __name__ == '__main__':
hello_world()