可能是 watermark 问题:并发多,测试数据少,有些并发没有数据,导致watermark没有增长。
如果是这个原因的话,有两种解决办法:
1)t_env.get_config().get_configuration().set_string("parallelism.default",
"1")
2)t_env.get_config().get_configuration().set_string("table.exec.source.idle-timeout",
"5000 ms")
On Sat, Dec 4, 2021 at 6:25 PM duanzebing <[email protected]> wrote:
>
>
> >
> >
> > 大家好:
> >
> 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
> > 我的 source语句为:
> > CREATE TABLE random_source (
> > amount int,
> > course_code string,
> > `event_time` TIMESTAMP(3) ,
> > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'ding_broadcast_vip_order_test',
> > 'properties.bootstrap.servers' = '192.168.100.135:9092',
> > 'properties.group.id <http://properties.group.id/>' = 'testGroup',
> > 'scan.startup.mode' = 'latest-offset',
> > 'format' = 'json'
> > )
> > sink为:
> > CREATE TABLE print_sink (
> > amount BIGINT,
> > course_code string,
> > `event_time_start` TIMESTAMP(3),
> > `event_time_end` TIMESTAMP(3)
> > ) WITH (
> > 'connector' = 'print'
> > )
> > 而我尝试过如下两种方式聚合,可是都遇到同样的问题:
> > insert into print_sink select
> > count(amount) as amount
> > ,course_code
> > ,tumble_start(event_time, interval '1' minute)
> as event_time_start
> > ,tumble_end(event_time, interval '1' minute) as
> event_time_end
> > from random_source
> > group by tumble(event_time, interval '1'
> minute),course_code
> > 以及:
> > insert into print_sink select
> > sum(amount),course_code,window_start,window_end
> > from table(
> > tumble(
> > table random_source,
> > descriptor(event_time),
> > interval '1' minutes
> > )
> > ) group by window_start, window_end,,course_code
> > 我的kafka数据结构为
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:44:40’}
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:45:40’}
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:46:40’}
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:47:40’}
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:48:40’}
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:49:40’}
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:50:40’}
> > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:51:40’}
> >
> 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
> * from random_source时,sink又能够正常输出,
> > 我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
> > 非常感谢各位
> >
> > from pyflink.table import EnvironmentSettings, StreamTableEnvironment,
> TableEnvironment
> >
> >
> > def hello_world():
> > """
> > 从随机Source读取数据,然后直接利用PrintSink输出。
> > """
> > settings = EnvironmentSettings.in_streaming_mode()
> > # env = StreamExecutionEnvironment.get_execution_environment()
> > # t_env = StreamTableEnvironment.create(s_env)
> > t_env = TableEnvironment.create(settings)
> > t_env.get_config().get_configuration().set_string("pipeline.jars",
> >
> "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar
> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar>")
> >
> > source_ddl = """
> > CREATE TABLE random_source (
> > amount int,
> > course_code string,
> > `event_time` TIMESTAMP(3) ,
> > WATERMARK FOR event_time AS event_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'ding_broadcast_vip_order_test',
> > 'properties.bootstrap.servers' = '
> 192.168.100.135:9092',
> > 'properties.group.id <
> http://properties.group.id/>' = 'testGroup',
> > 'scan.startup.mode' = 'latest-offset',
> > 'format' = 'json'
> > )
> > """
> > # f_sequence_ as PROCTIME() # METADATA FROM
> 'timestamp'
> > # 注册source
> > t_env.execute_sql(source_ddl)
> > # 数据提取
> >
> > """
> > sink_ddl = """
> > CREATE TABLE print_sink (
> > amount BIGINT,
> > course_code string,
> > `event_time_start` TIMESTAMP(3),
> > `event_time_end` TIMESTAMP(3)
> > ) WITH (
> > 'connector' = 'print'
> > )
> > """
> > # 注册sink
> > t_env.execute_sql(sink_ddl)
> >
> > t_env.execute_sql('''insert into print_sink select
> > count(amount) as amount
> > ,course_code
> > ,tumble_start(event_time, interval '1'
> minute) as event_time_start
> > ,tumble_end(event_time, interval '1' minute)
> as event_time_end
> > from random_source
> > group by tumble(event_time, interval '1'
> minute),course_code
> > ''').wait()
> >
> > # tab.insert_into('print_sink')
> > # tab.execute_insert('print_sink', overwrite=False).wait()
> > # 执行作业
> > # t_env.execute("Flink Hello World")
> >
> >
> > if __name__ == '__main__':
> > hello_world()
> >
> >
>
>