感谢指点,我按照您的方法测试了一下现在能够接受到数据,但是又遇到了新的问题。只能接收到一次,我把生产者测试数据用for循环循环了一千次发送,在我的预期中,
insert into print_sink select
count(amount) as amount
,max(course_code) as course_code
,tumble_start(event_time, interval '2' minute) as
event_time_start
,tumble_end(event_time, interval '2' minute) as
event_time_end
from random_source
group by tumble(event_time, interval '2' minute)
count(amount)的数字应该会随着kafka消息的接收而不断的增大,但事实是并没有这样,仅仅在接收一遍以后就停止了接收,控制台输出如下:
/Users/duanzebing/PycharmProjects/kaikeba/venv/bin/python
/Users/duanzebing/PycharmProjects/kaikeba/pyflink_test.py
+I[2, 97iscn4g8k, 2021-12-01T17:44, 2021-12-01T17:46]
+I[2, 97iscn4g8k, 2021-12-01T17:46, 2021-12-01T17:48]
我不太理解这是什么原因造成的,
> 2021年12月6日 上午9:36,Dian Fu <[email protected]> 写道:
>
> 可能是 watermark 问题:并发多,测试数据少,有些并发没有数据,导致watermark没有增长。
>
> 如果是这个原因的话,有两种解决办法:
> 1)t_env.get_config().get_configuration().set_string("parallelism.default",
> "1")
> 2)t_env.get_config().get_configuration().set_string("table.exec.source.idle-timeout",
> "5000 ms")
>
>
> On Sat, Dec 4, 2021 at 6:25 PM duanzebing <[email protected]
> <mailto:[email protected]>> wrote:
>
>>
>>
>>>
>>>
>>> 大家好:
>>>
>> 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
>>> 我的 source语句为:
>>> CREATE TABLE random_source (
>>> amount int,
>>> course_code string,
>>> `event_time` TIMESTAMP(3) ,
>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'ding_broadcast_vip_order_test',
>>> 'properties.bootstrap.servers' = '192.168.100.135:9092',
>>> 'properties.group.id <http://properties.group.id/>
>>> <http://properties.group.id/ <http://properties.group.id/>>' = 'testGroup',
>>> 'scan.startup.mode' = 'latest-offset',
>>> 'format' = 'json'
>>> )
>>> sink为:
>>> CREATE TABLE print_sink (
>>> amount BIGINT,
>>> course_code string,
>>> `event_time_start` TIMESTAMP(3),
>>> `event_time_end` TIMESTAMP(3)
>>> ) WITH (
>>> 'connector' = 'print'
>>> )
>>> 而我尝试过如下两种方式聚合,可是都遇到同样的问题:
>>> insert into print_sink select
>>> count(amount) as amount
>>> ,course_code
>>> ,tumble_start(event_time, interval '1' minute)
>> as event_time_start
>>> ,tumble_end(event_time, interval '1' minute) as
>> event_time_end
>>> from random_source
>>> group by tumble(event_time, interval '1'
>> minute),course_code
>>> 以及:
>>> insert into print_sink select
>>> sum(amount),course_code,window_start,window_end
>>> from table(
>>> tumble(
>>> table random_source,
>>> descriptor(event_time),
>>> interval '1' minutes
>>> )
>>> ) group by window_start, window_end,,course_code
>>> 我的kafka数据结构为
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:44:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:45:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:46:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:47:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:48:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:49:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:50:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:51:40’}
>>>
>> 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
>> * from random_source时,sink又能够正常输出,
>>> 我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
>>> 非常感谢各位
>>>
>>> from pyflink.table import EnvironmentSettings, StreamTableEnvironment,
>> TableEnvironment
>>>
>>>
>>> def hello_world():
>>> """
>>> 从随机Source读取数据,然后直接利用PrintSink输出。
>>> """
>>> settings = EnvironmentSettings.in_streaming_mode()
>>> # env = StreamExecutionEnvironment.get_execution_environment()
>>> # t_env = StreamTableEnvironment.create(s_env)
>>> t_env = TableEnvironment.create(settings)
>>> t_env.get_config().get_configuration().set_string("pipeline.jars",
>>>
>> "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar
>>
>> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar>
>> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar
>>
>> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar>>")
>>>
>>> source_ddl = """
>>> CREATE TABLE random_source (
>>> amount int,
>>> course_code string,
>>> `event_time` TIMESTAMP(3) ,
>>> WATERMARK FOR event_time AS event_time -
>> INTERVAL '5' SECOND
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'ding_broadcast_vip_order_test',
>>> 'properties.bootstrap.servers' = '
>> 192.168.100.135:9092',
>>> 'properties.group.id <http://properties.group.id/>
>>> <
>> http://properties.group.id/ <http://properties.group.id/>>' = 'testGroup',
>>> 'scan.startup.mode' = 'latest-offset',
>>> 'format' = 'json'
>>> )
>>> """
>>> # f_sequence_ as PROCTIME() # METADATA FROM
>> 'timestamp'
>>> # 注册source
>>> t_env.execute_sql(source_ddl)
>>> # 数据提取
>>>
>>> """
>>> sink_ddl = """
>>> CREATE TABLE print_sink (
>>> amount BIGINT,
>>> course_code string,
>>> `event_time_start` TIMESTAMP(3),
>>> `event_time_end` TIMESTAMP(3)
>>> ) WITH (
>>> 'connector' = 'print'
>>> )
>>> """
>>> # 注册sink
>>> t_env.execute_sql(sink_ddl)
>>>
>>> t_env.execute_sql('''insert into print_sink select
>>> count(amount) as amount
>>> ,course_code
>>> ,tumble_start(event_time, interval '1'
>> minute) as event_time_start
>>> ,tumble_end(event_time, interval '1' minute)
>> as event_time_end
>>> from random_source
>>> group by tumble(event_time, interval '1'
>> minute),course_code
>>> ''').wait()
>>>
>>> # tab.insert_into('print_sink')
>>> # tab.execute_insert('print_sink', overwrite=False).wait()
>>> # 执行作业
>>> # t_env.execute("Flink Hello World")
>>>
>>>
>>> if __name__ == '__main__':
>>> hello_world()