hi,zebing!

You can go to localhost:8081 and see if it works. Also, data written to Kafka 
should be in double quotes.
example:
{"amount": 500, "course_code": "97iscn4g8k","event_time":"2021-12-01 17:54:41"}

Window aggregation needs to pay attention to the progress of watermark.
________________________________
发件人: duanzebing <[email protected]>
发送时间: 2021年12月4日 18:14
收件人: [email protected] <[email protected]>
主题: apache-flink - 在pyflink1.14中聚合函数无法输出的问题

大家好:
         
我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
我的 source语句为:
CREATE TABLE random_source (
    amount int,
    course_code string,
    `event_time` TIMESTAMP(3) ,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'ding_broadcast_vip_order_test',
  'properties.bootstrap.servers' = '192.168.100.135:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
)
sink为:
  CREATE TABLE print_sink (
    amount BIGINT,
    course_code string,
   `event_time_start` TIMESTAMP(3),
   `event_time_end` TIMESTAMP(3)
) WITH (
  'connector' = 'print'
)
而我尝试过如下两种方式聚合,可是都遇到同样的问题:
insert into print_sink select
                        count(amount) as amount
                        ,course_code
                        ,tumble_start(event_time, interval '1' minute) as 
event_time_start
                        ,tumble_end(event_time, interval '1' minute) as 
event_time_end
                    from random_source
                    group by tumble(event_time, interval '1' minute),course_code
以及:
insert into print_sink select
                        sum(amount),course_code,window_start,window_end
                        from table(
                            tumble(
                            table random_source,
                            descriptor(event_time),
                            interval '1' minutes
                            )
                        ) group by window_start, window_end,,course_code
我的kafka数据结构为
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:44:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:45:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:46:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:47:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:48:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:49:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:50:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:51:40’}
       
在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
 * from random_source时,sink又能够正常输出,
        我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
        非常感谢各位

from pyflink.table import EnvironmentSettings, StreamTableEnvironment, 
TableEnvironment


def hello_world():
    """
        从随机Source读取数据,然后直接利用PrintSink输出。
    """
    settings = EnvironmentSettings.in_streaming_mode()
    # env = StreamExecutionEnvironment.get_execution_environment()
    # t_env = StreamTableEnvironment.create(s_env)
    t_env = TableEnvironment.create(settings)
    t_env.get_config().get_configuration().set_string("pipeline.jars",
                                                      
"file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar")

    source_ddl = """
                        CREATE TABLE random_source (
                            amount int,
                            course_code string,
                            `event_time` TIMESTAMP(3) ,
                            WATERMARK FOR event_time AS event_time - INTERVAL 
'5' SECOND
                        ) WITH (
                            'connector' = 'kafka',
                            'topic' = 'ding_broadcast_vip_order_test',
                          'properties.bootstrap.servers' = 
'192.168.100.135:9092',
                          'properties.group.id' = 'testGroup',
                          'scan.startup.mode' = 'latest-offset',
                          'format' = 'json'
                        )
                        """
    #                         f_sequence_ as PROCTIME()  # METADATA FROM 
'timestamp'
    # 注册source
    t_env.execute_sql(source_ddl)
    # 数据提取

                """
    sink_ddl = """
                      CREATE TABLE print_sink (
                        amount BIGINT,
                        course_code string,
                       `event_time_start` TIMESTAMP(3),
                       `event_time_end` TIMESTAMP(3)
                    ) WITH (
                      'connector' = 'print'
                    )
            """
    # 注册sink
    t_env.execute_sql(sink_ddl)

    t_env.execute_sql('''insert into print_sink select
                            count(amount) as amount
                            ,course_code
                            ,tumble_start(event_time, interval '1' minute) as 
event_time_start
                            ,tumble_end(event_time, interval '1' minute) as 
event_time_end
                        from random_source
                        group by tumble(event_time, interval '1' 
minute),course_code
                        ''').wait()

    # tab.insert_into('print_sink')
    # tab.execute_insert('print_sink', overwrite=False).wait()
    # 执行作业
    # t_env.execute("Flink Hello World")


if __name__ == '__main__':
    hello_world()


回复