感谢指点,我按照您的方法测试了一下现在能够接受到数据,但是又遇到了新的问题。只能接收到一次,我把生产者测试数据用for循环循环了一千次发送,在我的预期中,
insert into print_sink select 
                        count(amount) as amount
                        ,max(course_code) as course_code
                        ,tumble_start(event_time, interval '2' minute) as 
event_time_start
                        ,tumble_end(event_time, interval '2' minute) as 
event_time_end
                    from random_source 
                    group by tumble(event_time, interval '2' minute)
count(amount)的数字应该会随着kafka消息的接收而不断的增大,但事实是并没有这样,仅仅在接收一遍以后就停止了接收,控制台输出如下:
/Users/duanzebing/PycharmProjects/kaikeba/venv/bin/python 
/Users/duanzebing/PycharmProjects/kaikeba/pyflink_test.py
+I[2, 97iscn4g8k, 2021-12-01T17:44, 2021-12-01T17:46]
+I[2, 97iscn4g8k, 2021-12-01T17:46, 2021-12-01T17:48]

我不太理解这是什么原因造成的,

> 2021年12月6日 上午9:36,Dian Fu <[email protected]> 写道:
> 
> 可能是 watermark 问题:并发多,测试数据少,有些并发没有数据,导致watermark没有增长。
> 
> 如果是这个原因的话,有两种解决办法:
> 1)t_env.get_config().get_configuration().set_string("parallelism.default",
> "1")
> 2)t_env.get_config().get_configuration().set_string("table.exec.source.idle-timeout",
> "5000 ms")
> 
> 
> On Sat, Dec 4, 2021 at 6:25 PM duanzebing <[email protected] 
> <mailto:[email protected]>> wrote:
> 
>> 
>> 
>>> 
>>> 
>>> 大家好:
>>> 
>> 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
>>> 我的 source语句为:
>>> CREATE TABLE random_source (
>>>    amount int,
>>>    course_code string,
>>>    `event_time` TIMESTAMP(3) ,
>>>    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
>>> ) WITH (
>>>    'connector' = 'kafka',
>>>    'topic' = 'ding_broadcast_vip_order_test',
>>>  'properties.bootstrap.servers' = '192.168.100.135:9092',
>>>  'properties.group.id <http://properties.group.id/> 
>>> <http://properties.group.id/ <http://properties.group.id/>>' = 'testGroup',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json'
>>> )
>>> sink为:
>>>  CREATE TABLE print_sink (
>>>    amount BIGINT,
>>>    course_code string,
>>>   `event_time_start` TIMESTAMP(3),
>>>   `event_time_end` TIMESTAMP(3)
>>> ) WITH (
>>>  'connector' = 'print'
>>> )
>>> 而我尝试过如下两种方式聚合,可是都遇到同样的问题:
>>> insert into print_sink select
>>>                        count(amount) as amount
>>>                        ,course_code
>>>                        ,tumble_start(event_time, interval '1' minute)
>> as event_time_start
>>>                        ,tumble_end(event_time, interval '1' minute) as
>> event_time_end
>>>                    from random_source
>>>                    group by tumble(event_time, interval '1'
>> minute),course_code
>>> 以及:
>>> insert into print_sink select
>>>                        sum(amount),course_code,window_start,window_end
>>>                        from table(
>>>                            tumble(
>>>                            table random_source,
>>>                            descriptor(event_time),
>>>                            interval '1' minutes
>>>                            )
>>>                        ) group by window_start, window_end,,course_code
>>> 我的kafka数据结构为
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:44:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:45:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:46:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:47:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:48:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:49:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:50:40’}
>>> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
>> 17:51:40’}
>>> 
>> 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
>> * from random_source时,sink又能够正常输出,
>>>      我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
>>>      非常感谢各位
>>> 
>>> from pyflink.table import EnvironmentSettings, StreamTableEnvironment,
>> TableEnvironment
>>> 
>>> 
>>> def hello_world():
>>>    """
>>>        从随机Source读取数据,然后直接利用PrintSink输出。
>>>    """
>>>    settings = EnvironmentSettings.in_streaming_mode()
>>>    # env = StreamExecutionEnvironment.get_execution_environment()
>>>    # t_env = StreamTableEnvironment.create(s_env)
>>>    t_env = TableEnvironment.create(settings)
>>>    t_env.get_config().get_configuration().set_string("pipeline.jars",
>>> 
>> "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar
>>  
>> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar>
>> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar
>>  
>> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar>>")
>>> 
>>>    source_ddl = """
>>>                        CREATE TABLE random_source (
>>>                            amount int,
>>>                            course_code string,
>>>                            `event_time` TIMESTAMP(3) ,
>>>                            WATERMARK FOR event_time AS event_time -
>> INTERVAL '5' SECOND
>>>                        ) WITH (
>>>                            'connector' = 'kafka',
>>>                            'topic' = 'ding_broadcast_vip_order_test',
>>>                          'properties.bootstrap.servers' = '
>> 192.168.100.135:9092',
>>>                          'properties.group.id <http://properties.group.id/> 
>>> <
>> http://properties.group.id/ <http://properties.group.id/>>' = 'testGroup',
>>>                          'scan.startup.mode' = 'latest-offset',
>>>                          'format' = 'json'
>>>                        )
>>>                        """
>>>    #                         f_sequence_ as PROCTIME()  # METADATA FROM
>> 'timestamp'
>>>    # 注册source
>>>    t_env.execute_sql(source_ddl)
>>>    # 数据提取
>>> 
>>>                """
>>>    sink_ddl = """
>>>                      CREATE TABLE print_sink (
>>>                        amount BIGINT,
>>>                        course_code string,
>>>                       `event_time_start` TIMESTAMP(3),
>>>                       `event_time_end` TIMESTAMP(3)
>>>                    ) WITH (
>>>                      'connector' = 'print'
>>>                    )
>>>            """
>>>    # 注册sink
>>>    t_env.execute_sql(sink_ddl)
>>> 
>>>    t_env.execute_sql('''insert into print_sink select
>>>                            count(amount) as amount
>>>                            ,course_code
>>>                            ,tumble_start(event_time, interval '1'
>> minute) as event_time_start
>>>                            ,tumble_end(event_time, interval '1' minute)
>> as event_time_end
>>>                        from random_source
>>>                        group by tumble(event_time, interval '1'
>> minute),course_code
>>>                        ''').wait()
>>> 
>>>    # tab.insert_into('print_sink')
>>>    # tab.execute_insert('print_sink', overwrite=False).wait()
>>>    # 执行作业
>>>    # t_env.execute("Flink Hello World")
>>> 
>>> 
>>> if __name__ == '__main__':
>>>    hello_world()

回复