循环发送数据的时候,需要确保数据的时间戳是增长的,否则可能会被作为迟到数据丢弃。

On Mon, Dec 6, 2021 at 2:11 PM su wenwen <[email protected]>
wrote:

> hi,zebing!
>
> You can go to localhost:8081 and see if it works. Also, data written to
> Kafka should be in double quotes.
> example:
> {"amount": 500, "course_code": "97iscn4g8k","event_time":"2021-12-01
> 17:54:41"}
>
> Window aggregation needs to pay attention to the progress of watermark.
> ________________________________
> 发件人: duanzebing <[email protected]>
> 发送时间: 2021年12月4日 18:14
> 收件人: [email protected] <[email protected]>
> 主题: apache-flink - 在pyflink1.14中聚合函数无法输出的问题
>
> 大家好:
>
>  我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
> 我的 source语句为:
> CREATE TABLE random_source (
>     amount int,
>     course_code string,
>     `event_time` TIMESTAMP(3) ,
>     WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'ding_broadcast_vip_order_test',
>   'properties.bootstrap.servers' = '192.168.100.135:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> )
> sink为:
>   CREATE TABLE print_sink (
>     amount BIGINT,
>     course_code string,
>    `event_time_start` TIMESTAMP(3),
>    `event_time_end` TIMESTAMP(3)
> ) WITH (
>   'connector' = 'print'
> )
> 而我尝试过如下两种方式聚合,可是都遇到同样的问题:
> insert into print_sink select
>                         count(amount) as amount
>                         ,course_code
>                         ,tumble_start(event_time, interval '1' minute) as
> event_time_start
>                         ,tumble_end(event_time, interval '1' minute) as
> event_time_end
>                     from random_source
>                     group by tumble(event_time, interval '1'
> minute),course_code
> 以及:
> insert into print_sink select
>                         sum(amount),course_code,window_start,window_end
>                         from table(
>                             tumble(
>                             table random_source,
>                             descriptor(event_time),
>                             interval '1' minutes
>                             )
>                         ) group by window_start, window_end,,course_code
> 我的kafka数据结构为
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:44:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:45:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:46:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:47:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:48:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:49:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:50:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:51:40’}
>
>  
> 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
> * from random_source时,sink又能够正常输出,
>         我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
>         非常感谢各位
>
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment,
> TableEnvironment
>
>
> def hello_world():
>     """
>         从随机Source读取数据,然后直接利用PrintSink输出。
>     """
>     settings = EnvironmentSettings.in_streaming_mode()
>     # env = StreamExecutionEnvironment.get_execution_environment()
>     # t_env = StreamTableEnvironment.create(s_env)
>     t_env = TableEnvironment.create(settings)
>     t_env.get_config().get_configuration().set_string("pipeline.jars",
>
> "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar")
>
>     source_ddl = """
>                         CREATE TABLE random_source (
>                             amount int,
>                             course_code string,
>                             `event_time` TIMESTAMP(3) ,
>                             WATERMARK FOR event_time AS event_time -
> INTERVAL '5' SECOND
>                         ) WITH (
>                             'connector' = 'kafka',
>                             'topic' = 'ding_broadcast_vip_order_test',
>                           'properties.bootstrap.servers' = '
> 192.168.100.135:9092',
>                           'properties.group.id' = 'testGroup',
>                           'scan.startup.mode' = 'latest-offset',
>                           'format' = 'json'
>                         )
>                         """
>     #                         f_sequence_ as PROCTIME()  # METADATA FROM
> 'timestamp'
>     # 注册source
>     t_env.execute_sql(source_ddl)
>     # 数据提取
>
>                 """
>     sink_ddl = """
>                       CREATE TABLE print_sink (
>                         amount BIGINT,
>                         course_code string,
>                        `event_time_start` TIMESTAMP(3),
>                        `event_time_end` TIMESTAMP(3)
>                     ) WITH (
>                       'connector' = 'print'
>                     )
>             """
>     # 注册sink
>     t_env.execute_sql(sink_ddl)
>
>     t_env.execute_sql('''insert into print_sink select
>                             count(amount) as amount
>                             ,course_code
>                             ,tumble_start(event_time, interval '1' minute)
> as event_time_start
>                             ,tumble_end(event_time, interval '1' minute)
> as event_time_end
>                         from random_source
>                         group by tumble(event_time, interval '1'
> minute),course_code
>                         ''').wait()
>
>     # tab.insert_into('print_sink')
>     # tab.execute_insert('print_sink', overwrite=False).wait()
>     # 执行作业
>     # t_env.execute("Flink Hello World")
>
>
> if __name__ == '__main__':
>     hello_world()
>
>
>

回复