> 
> 
> 大家好:
>       
> 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
> 我的 source语句为:
> CREATE TABLE random_source (
>     amount int,
>     course_code string,
>     `event_time` TIMESTAMP(3) ,
>     WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'ding_broadcast_vip_order_test',
>   'properties.bootstrap.servers' = '192.168.100.135:9092',
>   'properties.group.id <http://properties.group.id/>' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> )
> sink为:
>   CREATE TABLE print_sink (
>     amount BIGINT,
>     course_code string,
>    `event_time_start` TIMESTAMP(3),
>    `event_time_end` TIMESTAMP(3)
> ) WITH (
>   'connector' = 'print'
> )
> 而我尝试过如下两种方式聚合,可是都遇到同样的问题:
> insert into print_sink select 
>                         count(amount) as amount
>                         ,course_code
>                         ,tumble_start(event_time, interval '1' minute) as 
> event_time_start
>                         ,tumble_end(event_time, interval '1' minute) as 
> event_time_end
>                     from random_source 
>                     group by tumble(event_time, interval '1' 
> minute),course_code
> 以及:
> insert into print_sink select 
>                         sum(amount),course_code,window_start,window_end
>                         from table(
>                             tumble(
>                             table random_source,
>                             descriptor(event_time),
>                             interval '1' minutes
>                             )
>                         ) group by window_start, window_end,,course_code
> 我的kafka数据结构为
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:44:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:45:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:46:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:47:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:48:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:49:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:50:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 
> 17:51:40’}
>        
> 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
>  * from random_source时,sink又能够正常输出,
>       我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
>       非常感谢各位
> 
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment, 
> TableEnvironment
> 
> 
> def hello_world():
>     """
>         从随机Source读取数据,然后直接利用PrintSink输出。
>     """
>     settings = EnvironmentSettings.in_streaming_mode()
>     # env = StreamExecutionEnvironment.get_execution_environment()
>     # t_env = StreamTableEnvironment.create(s_env)
>     t_env = TableEnvironment.create(settings)
>     t_env.get_config().get_configuration().set_string("pipeline.jars",
>                                                       
> "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar 
> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar>")
> 
>     source_ddl = """
>                         CREATE TABLE random_source (
>                             amount int,
>                             course_code string,
>                             `event_time` TIMESTAMP(3) ,
>                             WATERMARK FOR event_time AS event_time - INTERVAL 
> '5' SECOND
>                         ) WITH (
>                             'connector' = 'kafka',
>                             'topic' = 'ding_broadcast_vip_order_test',
>                           'properties.bootstrap.servers' = 
> '192.168.100.135:9092',
>                           'properties.group.id <http://properties.group.id/>' 
> = 'testGroup',
>                           'scan.startup.mode' = 'latest-offset',
>                           'format' = 'json'
>                         )
>                         """
>     #                         f_sequence_ as PROCTIME()  # METADATA FROM 
> 'timestamp'
>     # 注册source
>     t_env.execute_sql(source_ddl)
>     # 数据提取
> 
>                 """
>     sink_ddl = """
>                       CREATE TABLE print_sink (
>                         amount BIGINT,
>                         course_code string,
>                        `event_time_start` TIMESTAMP(3),
>                        `event_time_end` TIMESTAMP(3)
>                     ) WITH (
>                       'connector' = 'print'
>                     )
>             """
>     # 注册sink
>     t_env.execute_sql(sink_ddl)
> 
>     t_env.execute_sql('''insert into print_sink select 
>                             count(amount) as amount
>                             ,course_code
>                             ,tumble_start(event_time, interval '1' minute) as 
> event_time_start
>                             ,tumble_end(event_time, interval '1' minute) as 
> event_time_end
>                         from random_source 
>                         group by tumble(event_time, interval '1' 
> minute),course_code
>                         ''').wait()
> 
>     # tab.insert_into('print_sink')
>     # tab.execute_insert('print_sink', overwrite=False).wait()
>     # 执行作业
>     # t_env.execute("Flink Hello World")
> 
> 
> if __name__ == '__main__':
>     hello_world()
> 
> 

回复